diff options
author | Tor Brede Vekterli <vekterli@yahooinc.com> | 2023-08-22 10:40:58 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-08-22 10:40:58 +0200 |
commit | f816fdb94db2a8cd2cdbcb4f639caa5fad769a39 (patch) | |
tree | 358ca77f9869cb634f9cf2b03b4c55318b6beb9f /storage | |
parent | 49fc314d4366d0ef011aeff82d721679080f2d9f (diff) | |
parent | 15e28e09b7ef78530f3821a82a3743b9dcd3c284 (diff) |
Merge pull request #28086 from vespa-engine/vekterli/distributor-operation-cancelling
Implement edge-triggered distributor operation cancelling
Diffstat (limited to 'storage')
32 files changed, 854 insertions, 89 deletions
diff --git a/storage/src/tests/distributor/check_condition_test.cpp b/storage/src/tests/distributor/check_condition_test.cpp index 757a9329ea6..617401dd271 100644 --- a/storage/src/tests/distributor/check_condition_test.cpp +++ b/storage/src/tests/distributor/check_condition_test.cpp @@ -5,6 +5,7 @@ #include <vespa/document/fieldset/fieldsets.h> #include <vespa/documentapi/messagebus/messages/testandsetcondition.h> #include <vespa/storage/distributor/node_supported_features.h> +#include <vespa/storage/distributor/operations/cancel_scope.h> #include <vespa/storage/distributor/operations/external/check_condition.h> #include <vespa/storage/distributor/persistence_operation_metric_set.h> #include <vespa/storageapi/message/persistence.h> @@ -227,6 +228,20 @@ TEST_F(CheckConditionTest, failed_gets_completes_check_with_error_outcome) { }); } +TEST_F(CheckConditionTest, check_fails_if_condition_explicitly_cancelled) { + test_cond_with_2_gets_sent([&](auto& cond) { + cond.handle_reply(_sender, make_matched_reply(0)); + cond.cancel(_sender, CancelScope::of_fully_cancelled()); + cond.handle_reply(_sender, make_matched_reply(1)); + }, [&](auto& outcome) { + EXPECT_FALSE(outcome.matched_condition()); + EXPECT_FALSE(outcome.not_found()); + EXPECT_TRUE(outcome.failed()); + EXPECT_EQ(outcome.error_code().getResult(), api::ReturnCode::ABORTED); + }); +} + +// TODO deprecate in favor of cancelling TEST_F(CheckConditionTest, check_fails_if_replica_set_changed_between_start_and_completion) { test_cond_with_2_gets_sent([&](auto& cond) { cond.handle_reply(_sender, make_matched_reply(0)); @@ -242,6 +257,7 @@ TEST_F(CheckConditionTest, check_fails_if_replica_set_changed_between_start_and_ }); } +// TODO deprecate in favor of cancelling TEST_F(CheckConditionTest, check_fails_if_bucket_ownership_changed_between_start_and_completion_pending_transition_case) { test_cond_with_2_gets_sent([&](auto& cond) { cond.handle_reply(_sender, make_matched_reply(0)); @@ -255,6 +271,7 @@ TEST_F(CheckConditionTest, check_fails_if_bucket_ownership_changed_between_start }); } +// TODO deprecate in favor of cancelling TEST_F(CheckConditionTest, check_fails_if_bucket_ownership_changed_between_start_and_completion_completed_transition_case) { test_cond_with_2_gets_sent([&](auto& cond) { cond.handle_reply(_sender, make_matched_reply(0)); diff --git a/storage/src/tests/distributor/garbagecollectiontest.cpp b/storage/src/tests/distributor/garbagecollectiontest.cpp index 9b5056f2066..b1cf1cbc636 100644 --- a/storage/src/tests/distributor/garbagecollectiontest.cpp +++ b/storage/src/tests/distributor/garbagecollectiontest.cpp @@ -113,9 +113,8 @@ struct GarbageCollectionOperationTest : Test, DistributorStripeTestUtil { ASSERT_EQ(entry->getNodeCount(), info.size()); EXPECT_EQ(entry->getLastGarbageCollectionTime(), last_gc_time); for (size_t i = 0; i < info.size(); ++i) { - EXPECT_EQ(info[i], entry->getNode(i)->getBucketInfo()) - << "Mismatching info for node " << i << ": " << info[i] << " vs " - << entry->getNode(i)->getBucketInfo(); + auto& node = entry->getNodeRef(i); + EXPECT_EQ(info[i], node.getBucketInfo()) << "Mismatching DB bucket info for node " << node.getNode(); } } @@ -172,6 +171,51 @@ TEST_F(GarbageCollectionOperationTest, replica_bucket_info_not_added_to_db_until EXPECT_EQ(70u, gc_removed_documents_metric()); // Use max of received metrics } +TEST_F(GarbageCollectionOperationTest, no_replica_bucket_info_added_to_db_if_operation_fully_canceled) { + auto op = create_op(); + op->start(_sender); + ASSERT_EQ(2, _sender.commands().size()); + + reply_to_nth_request(*op, 0, 1234, 70); + op->cancel(_sender, CancelScope::of_fully_cancelled()); + reply_to_nth_request(*op, 1, 4567, 60); + + // DB state is unchanged. Note that in a real scenario, the DB entry will have been removed + // as part of the ownership change, but there are already non-cancellation behaviors that + // avoid creating buckets from scratch in the DB if they do not exist, so just checking to + // see if the bucket exists or not risks hiding missing cancellation edge handling. + ASSERT_NO_FATAL_FAILURE(assert_bucket_db_contains({api::BucketInfo(250, 50, 300), api::BucketInfo(250, 50, 300)}, 0)); + // However, we still update our metrics if we _did_ remove documents on one or more nodes + EXPECT_EQ(70u, gc_removed_documents_metric()); +} + +TEST_F(GarbageCollectionOperationTest, no_replica_bucket_info_added_to_db_for_cancelled_node) { + auto op = create_op(); + op->start(_sender); + ASSERT_EQ(2, _sender.commands().size()); + + reply_to_nth_request(*op, 0, 1234, 70); + op->cancel(_sender, CancelScope::of_node_subset({0})); + reply_to_nth_request(*op, 1, 4567, 60); + + // DB state is unchanged for node 0, changed for node 1 + ASSERT_NO_FATAL_FAILURE(assert_bucket_db_contains({api::BucketInfo(250, 50, 300), api::BucketInfo(4567, 90, 500)}, 34)); +} + +TEST_F(GarbageCollectionOperationTest, node_cancellation_is_cumulative) { + auto op = create_op(); + op->start(_sender); + ASSERT_EQ(2, _sender.commands().size()); + + reply_to_nth_request(*op, 0, 1234, 70); + op->cancel(_sender, CancelScope::of_node_subset({0})); + op->cancel(_sender, CancelScope::of_node_subset({1})); + reply_to_nth_request(*op, 1, 4567, 60); + + // DB state is unchanged for both nodes + ASSERT_NO_FATAL_FAILURE(assert_bucket_db_contains({api::BucketInfo(250, 50, 300), api::BucketInfo(250, 50, 300)}, 0)); +} + TEST_F(GarbageCollectionOperationTest, gc_bucket_info_does_not_overwrite_later_sequenced_bucket_info_writes) { auto op = create_op(); op->start(_sender); @@ -363,6 +407,16 @@ TEST_F(GarbageCollectionOperationPhase1FailureTest, no_second_phase_if_bucket_in receive_phase1_replies_and_assert_no_phase_2_started(); } +TEST_F(GarbageCollectionOperationPhase1FailureTest, no_second_phase_if_operation_fully_cancelled_between_phases) { + _op->cancel(_sender, CancelScope::of_fully_cancelled()); + receive_phase1_replies_and_assert_no_phase_2_started(); +} + +TEST_F(GarbageCollectionOperationPhase1FailureTest, no_second_phase_if_operation_partially_cancelled_between_phases) { + _op->cancel(_sender, CancelScope::of_node_subset({0})); + receive_phase1_replies_and_assert_no_phase_2_started(); +} + TEST_F(GarbageCollectionOperationTest, document_level_write_locks_are_checked_and_held_if_acquired) { enable_two_phase_gc(); auto op = create_op(); diff --git a/storage/src/tests/distributor/putoperationtest.cpp b/storage/src/tests/distributor/putoperationtest.cpp index 76b6741442e..ee87fe84df6 100644 --- a/storage/src/tests/distributor/putoperationtest.cpp +++ b/storage/src/tests/distributor/putoperationtest.cpp @@ -6,6 +6,7 @@ #include <vespa/storage/distributor/top_level_distributor.h> #include <vespa/storage/distributor/distributor_bucket_space.h> #include <vespa/storage/distributor/distributor_stripe.h> +#include <vespa/storage/distributor/operations/cancel_scope.h> #include <vespa/storage/distributor/operations/external/putoperation.h> #include <vespa/storageapi/message/bucket.h> #include <vespa/storageapi/message/persistence.h> @@ -208,6 +209,43 @@ TEST_F(PutOperationTest, failed_CreateBucket_removes_replica_from_db_and_sends_R _sender.getCommands(true, true, 4)); } +TEST_F(PutOperationTest, failed_CreateBucket_does_not_send_RequestBucketInfo_if_op_fully_canceled) { + setup_stripe(2, 2, "distributor:1 storage:2"); + + auto doc = createDummyDocument("test", "test"); + sendPut(createPut(doc)); + + ASSERT_EQ("Create bucket => 1,Create bucket => 0,Put => 1,Put => 0", _sender.getCommands(true)); + + op->cancel(_sender, CancelScope::of_fully_cancelled()); + sendReply(0, api::ReturnCode::TIMEOUT, api::BucketInfo()); // CreateBucket to node 1 + + // DB is not touched (note: normally node 1 would be removed at the cancel-edge). + ASSERT_EQ("BucketId(0x4000000000008f09) : " + "node(idx=1,crc=0x1,docs=0/0,bytes=0/0,trusted=true,active=true,ready=false), " + "node(idx=0,crc=0x1,docs=0/0,bytes=0/0,trusted=true,active=false,ready=false)", + dumpBucket(operation_context().make_split_bit_constrained_bucket_id(doc->getId()))); + // No new requests sent + ASSERT_EQ("", _sender.getCommands(true, true, 4)); +} + +TEST_F(PutOperationTest, failed_CreateBucket_does_not_send_RequestBucketInfo_for_cancelled_nodes) { + setup_stripe(2, 2, "distributor:1 storage:2"); + + auto doc = createDummyDocument("test", "test"); + sendPut(createPut(doc)); + + ASSERT_EQ("Create bucket => 1,Create bucket => 0,Put => 1,Put => 0", _sender.getCommands(true)); + + op->cancel(_sender, CancelScope::of_node_subset({0})); + sendReply(0, api::ReturnCode::TIMEOUT, api::BucketInfo()); // CreateBucket to node 1 + sendReply(1, api::ReturnCode::TIMEOUT, api::BucketInfo()); // CreateBucket to node 0 + + // Bucket info recheck only sent to node 1, as it's not cancelled + ASSERT_EQ("RequestBucketInfoCommand(1 buckets, super bucket BucketId(0x4000000000008f09). ) => 1", + _sender.getCommands(true, true, 4)); +} + TEST_F(PutOperationTest, send_inline_split_before_put_if_bucket_too_large) { setup_stripe(1, 1, "storage:1 distributor:1"); auto cfg = make_config(); @@ -272,6 +310,26 @@ TEST_F(PutOperationTest, return_success_if_op_acked_on_all_replicas_even_if_buck _sender.getLastReply()); } +TEST_F(PutOperationTest, return_success_if_op_acked_on_all_replicas_even_if_operation_cancelled) { + setup_stripe(2, 2, "storage:2 distributor:1"); + createAndSendSampleDocument(TIMEOUT); + + ASSERT_EQ("Put(BucketId(0x4000000000001dd4), " + "id:test:testdoctype1::, timestamp 100, size 45) => 0," + "Put(BucketId(0x4000000000001dd4), " + "id:test:testdoctype1::, timestamp 100, size 45) => 1", + _sender.getCommands(true, true)); + + op->cancel(_sender, CancelScope::of_fully_cancelled()); + + sendReply(0); + sendReply(1); + + ASSERT_EQ("PutReply(id:test:testdoctype1::, BucketId(0x0000000000000000), " + "timestamp 100) ReturnCode(NONE)", + _sender.getLastReply()); +} + TEST_F(PutOperationTest, storage_failed) { setup_stripe(2, 1, "storage:1 distributor:1"); @@ -491,7 +549,7 @@ TEST_F(PutOperationTest, update_correct_bucket_on_remapped_put) { { std::shared_ptr<api::StorageCommand> msg2 = _sender.command(0); - std::shared_ptr<api::StorageReply> reply(msg2->makeReply().release()); + std::shared_ptr<api::StorageReply> reply(msg2->makeReply()); auto* sreply = dynamic_cast<api::PutReply*>(reply.get()); ASSERT_TRUE(sreply); sreply->remapBucketId(document::BucketId(17, 13)); @@ -511,6 +569,7 @@ TEST_F(PutOperationTest, update_correct_bucket_on_remapped_put) { dumpBucket(document::BucketId(17, 13))); } +// TODO make this redundant through operation cancelling TEST_F(PutOperationTest, replica_not_resurrected_in_db_when_node_down_in_active_state) { setup_stripe(Redundancy(3), NodeCount(3), "distributor:1 storage:3"); @@ -535,6 +594,7 @@ TEST_F(PutOperationTest, replica_not_resurrected_in_db_when_node_down_in_active_ dumpBucket(operation_context().make_split_bit_constrained_bucket_id(doc->getId()))); } +// TODO make this redundant through operation cancelling TEST_F(PutOperationTest, replica_not_resurrected_in_db_when_node_down_in_pending_state) { setup_stripe(Redundancy(3), NodeCount(4), "version:1 distributor:1 storage:3"); @@ -568,6 +628,8 @@ TEST_F(PutOperationTest, replica_not_resurrected_in_db_when_node_down_in_pending // TODO probably also do this for updates and removes // TODO consider if we should use the pending state verbatim for computing targets if it exists +// TODO make this redundant through operation cancelling +// ... actually; FIXME shouldn't the ExternalOperationHandler already cover this?? TEST_F(PutOperationTest, put_is_failed_with_busy_if_target_down_in_pending_state) { setup_stripe(Redundancy(3), NodeCount(4), "version:1 distributor:1 storage:3"); auto doc = createDummyDocument("test", "test"); @@ -584,6 +646,65 @@ TEST_F(PutOperationTest, put_is_failed_with_busy_if_target_down_in_pending_state _sender.getLastReply(true)); } +TEST_F(PutOperationTest, db_not_updated_if_operation_cancelled_by_ownership_change) { + setup_stripe(Redundancy(3), NodeCount(3), "distributor:1 storage:3"); + + auto doc = createDummyDocument("test", "uri"); + auto bucket = operation_context().make_split_bit_constrained_bucket_id(doc->getId()); + auto remap_bucket = BucketId(bucket.getUsedBits() + 1, bucket.getId()); + addNodesToBucketDB(bucket, "0=1/2/3/t,1=1/2/3/t,2=1/2/3/t"); + + sendPut(createPut(doc)); + + ASSERT_EQ("Put => 1,Put => 2,Put => 0", _sender.getCommands(true)); + + operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(bucket), {0, 1, 2}); + op->cancel(_sender, CancelScope::of_fully_cancelled()); + + // Normally DB updates triggered by replies don't _create_ buckets in the DB, unless + // they're remapped buckets. Use a remapping to ensure we hit a create-if-missing DB path. + { + std::shared_ptr<api::StorageCommand> msg2 = _sender.command(0); + std::shared_ptr<api::StorageReply> reply(msg2->makeReply()); + auto* sreply = dynamic_cast<api::PutReply*>(reply.get()); + ASSERT_TRUE(sreply); + sreply->remapBucketId(remap_bucket); + sreply->setBucketInfo(api::BucketInfo(1,2,3,4,5)); + op->receive(_sender, reply); + } + + sendReply(1, api::ReturnCode::OK, api::BucketInfo(5, 6, 7)); + sendReply(2, api::ReturnCode::OK, api::BucketInfo(7, 8, 9)); + + EXPECT_EQ("NONEXISTING", dumpBucket(bucket)); + EXPECT_EQ("NONEXISTING", dumpBucket(remap_bucket)); +} + +TEST_F(PutOperationTest, individually_cancelled_nodes_are_not_updated_in_db) { + setup_stripe(Redundancy(3), NodeCount(3), "distributor:1 storage:3"); + + auto doc = createDummyDocument("test", "uri"); + auto bucket = operation_context().make_split_bit_constrained_bucket_id(doc->getId()); + addNodesToBucketDB(bucket, "0=1/2/3/t,1=1/2/3/t,2=1/2/3/t"); + + sendPut(createPut(doc)); + ASSERT_EQ("Put => 1,Put => 2,Put => 0", _sender.getCommands(true)); + + // Simulate nodes 0 and 2 going down + operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(bucket), {0, 2}); + // Cancelling shall be cumulative + op->cancel(_sender, CancelScope::of_node_subset({0})); + op->cancel(_sender, CancelScope::of_node_subset({2})); + + sendReply(0, api::ReturnCode::OK, api::BucketInfo(5, 6, 7)); + sendReply(1, api::ReturnCode::OK, api::BucketInfo(6, 7, 8)); + sendReply(2, api::ReturnCode::OK, api::BucketInfo(9, 8, 7)); + + EXPECT_EQ("BucketId(0x4000000000000593) : " + "node(idx=1,crc=0x5,docs=6/6,bytes=7/7,trusted=true,active=false,ready=false)", + dumpBucket(bucket)); +} + TEST_F(PutOperationTest, send_to_retired_nodes_if_no_up_nodes_available) { setup_stripe(Redundancy(2), NodeCount(2), "distributor:1 storage:2 .0.s:r .1.s:r"); @@ -761,6 +882,38 @@ TEST_F(PutOperationTest, failed_condition_probe_fails_op_with_returned_error) { _sender.getLastReply()); } +TEST_F(PutOperationTest, ownership_cancellation_during_condition_probe_fails_operation_on_probe_completion) { + ASSERT_NO_FATAL_FAILURE(set_up_tas_put_with_2_inconsistent_replica_nodes()); + + op->receive(_sender, make_get_reply(*sent_get_command(0), 0, false, false)); + op->cancel(_sender, CancelScope::of_fully_cancelled()); + op->receive(_sender, make_get_reply(*sent_get_command(1), 0, false, false)); + + ASSERT_EQ("Get => 1,Get => 0", _sender.getCommands(true)); + ASSERT_EQ("PutReply(id:test:testdoctype1::test, " + "BucketId(0x0000000000000000), timestamp 100) " + "ReturnCode(ABORTED, Failed during write repair condition probe step. Reason: " + "Operation has been cancelled (likely due to a cluster state change))", + _sender.getLastReply()); +} + +TEST_F(PutOperationTest, replica_subset_cancellation_during_condition_probe_fails_operation_on_probe_completion) { + ASSERT_NO_FATAL_FAILURE(set_up_tas_put_with_2_inconsistent_replica_nodes()); + + op->receive(_sender, make_get_reply(*sent_get_command(0), 0, false, false)); + // 1 of 2 nodes; we still abort after the read phase since we cannot possibly fulfill + // the write phase for all replicas. + op->cancel(_sender, CancelScope::of_node_subset({0})); + op->receive(_sender, make_get_reply(*sent_get_command(1), 0, false, false)); + + ASSERT_EQ("Get => 1,Get => 0", _sender.getCommands(true)); + ASSERT_EQ("PutReply(id:test:testdoctype1::test, " + "BucketId(0x0000000000000000), timestamp 100) " + "ReturnCode(ABORTED, Failed during write repair condition probe step. Reason: " + "Operation has been cancelled (likely due to a cluster state change))", + _sender.getLastReply()); +} + TEST_F(PutOperationTest, create_flag_in_parent_put_is_propagated_to_sent_puts) { setup_stripe(Redundancy(2), NodeCount(2), "version:1 storage:2 distributor:1"); auto doc = createDummyDocument("test", "test"); diff --git a/storage/src/tests/distributor/removeoperationtest.cpp b/storage/src/tests/distributor/removeoperationtest.cpp index d169c80a95d..3fad2c194a2 100644 --- a/storage/src/tests/distributor/removeoperationtest.cpp +++ b/storage/src/tests/distributor/removeoperationtest.cpp @@ -68,6 +68,7 @@ struct RemoveOperationTest : Test, DistributorStripeTestUtil { std::unique_ptr<api::StorageReply> reply(removec->makeReply()); auto* removeR = dynamic_cast<api::RemoveReply*>(reply.get()); removeR->setOldTimestamp(oldTimestamp); + removeR->setBucketInfo(api::BucketInfo(1,2,3,4,5)); callback.onReceive(_sender, std::shared_ptr<api::StorageReply>(reply.release())); } @@ -307,6 +308,45 @@ TEST_F(ExtRemoveOperationTest, failed_condition_probe_fails_op_with_returned_err _sender.getLastReply()); } +// Note: we don't exhaustively test cancellation edges here, as we assume that Put/Update/Remove ops +// share the same underlying PersistenceMessageTracker logic. See PutOperationTest for more tests. + +TEST_F(ExtRemoveOperationTest, cancellation_during_condition_probe_fails_operation_on_probe_completion) { + ASSERT_NO_FATAL_FAILURE(set_up_tas_remove_with_2_nodes(ReplicaState::INCONSISTENT)); + + reply_with(make_get_reply(0, 50, false, true)); + op->cancel(_sender, CancelScope::of_fully_cancelled()); + reply_with(make_get_reply(1, 50, false, true)); + + ASSERT_EQ("Get => 1,Get => 0", _sender.getCommands(true)); + EXPECT_EQ("RemoveReply(BucketId(0x0000000000000000), " + "id:test:test::uri, " + "timestamp 100, not found) " + "ReturnCode(ABORTED, Failed during write repair condition probe step. Reason: " + "Operation has been cancelled (likely due to a cluster state change))", + _sender.getLastReply()); +} + +TEST_F(ExtRemoveOperationTest, cancelled_nodes_are_not_updated_in_db) { + ASSERT_NO_FATAL_FAILURE(set_up_tas_remove_with_2_nodes(ReplicaState::CONSISTENT)); + ASSERT_EQ("Remove => 1,Remove => 0", _sender.getCommands(true)); + + operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(bucketId), {1}); + op->cancel(_sender, CancelScope::of_node_subset({1})); + + replyToMessage(*op, 0, 50); + replyToMessage(*op, 1, 50); + + EXPECT_EQ("BucketId(0x4000000000000593) : " + "node(idx=0,crc=0x1,docs=2/4,bytes=3/5,trusted=true,active=false,ready=false)", + dumpBucket(bucketId)); + // Reply is still OK since the operation went through on the content nodes + ASSERT_EQ("RemoveReply(BucketId(0x0000000000000000), " + "id:test:test::uri, timestamp 100, removed doc from 50) ReturnCode(NONE)", + _sender.getLastReply()); + +} + TEST_F(ExtRemoveOperationTest, trace_is_propagated_from_condition_probe_gets_ok_probe_case) { ASSERT_NO_FATAL_FAILURE(set_up_tas_remove_with_2_nodes(ReplicaState::INCONSISTENT)); diff --git a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp index da32225cde3..1907335545a 100644 --- a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp +++ b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp @@ -4,16 +4,16 @@ #include <vespa/config/helper/configgetter.h> #include <vespa/document/base/testdocrepo.h> #include <vespa/document/fieldset/fieldsets.h> -#include <vespa/document/repo/documenttyperepo.h> #include <vespa/document/fieldvalue/intfieldvalue.h> +#include <vespa/document/repo/documenttyperepo.h> #include <vespa/document/test/make_document_bucket.h> #include <vespa/document/update/arithmeticvalueupdate.h> -#include <vespa/storage/distributor/top_level_distributor.h> #include <vespa/storage/distributor/distributor_stripe.h> #include <vespa/storage/distributor/externaloperationhandler.h> #include <vespa/storage/distributor/operations/external/twophaseupdateoperation.h> +#include <vespa/storage/distributor/top_level_distributor.h> #include <vespa/storageapi/message/persistence.h> -#include <vespa/vespalib/gtest/gtest.h> +#include <gtest/gtest.h> #include <gmock/gmock.h> namespace storage::distributor { @@ -30,8 +30,9 @@ using namespace ::testing; struct TwoPhaseUpdateOperationTest : Test, DistributorStripeTestUtil { document::TestDocRepo _testRepo; std::shared_ptr<const DocumentTypeRepo> _repo; - const DocumentType* _doc_type; + const DocumentType* _doc_type{nullptr}; DistributorMessageSenderStub _sender; + BucketId _bucket_id{0x400000000000cac4}; TwoPhaseUpdateOperationTest(); ~TwoPhaseUpdateOperationTest() override; @@ -39,7 +40,7 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorStripeTestUtil { void checkMessageSettingsPropagatedTo( const api::StorageCommand::SP& msg) const; - std::string getUpdatedValueFromLastPut(DistributorMessageSenderStub&); + static std::string getUpdatedValueFromLastPut(DistributorMessageSenderStub&); void SetUp() override { _repo = _testRepo.getTypeRepoSp(); @@ -57,20 +58,21 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorStripeTestUtil { close(); } - void replyToMessage(Operation& callback, - DistributorMessageSenderStub& sender, - uint32_t index, - uint64_t oldTimestamp, - api::ReturnCode::Result result = api::ReturnCode::OK); + static void replyToMessage( + Operation& callback, + DistributorMessageSenderStub& sender, + uint32_t index, + uint64_t oldTimestamp, + api::ReturnCode::Result result = api::ReturnCode::OK); - void replyToPut( + static void replyToPut( Operation& callback, DistributorMessageSenderStub& sender, uint32_t index, api::ReturnCode::Result result = api::ReturnCode::OK, const std::string& traceMsg = ""); - void replyToCreateBucket( + static void replyToCreateBucket( Operation& callback, DistributorMessageSenderStub& sender, uint32_t index, @@ -85,7 +87,7 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorStripeTestUtil { api::ReturnCode::Result result = api::ReturnCode::OK, const std::string& traceMsg = ""); - void reply_to_metadata_get( + static void reply_to_metadata_get( Operation& callback, DistributorMessageSenderStub& sender, uint32_t index, @@ -93,7 +95,7 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorStripeTestUtil { api::ReturnCode::Result result = api::ReturnCode::OK, const std::string& trace_msg = ""); - void reply_to_get_with_tombstone( + static void reply_to_get_with_tombstone( Operation& callback, DistributorMessageSenderStub& sender, uint32_t index, @@ -148,11 +150,17 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorStripeTestUtil { Timestamp highest_get_timestamp, Timestamp expected_response_timestamp); - std::shared_ptr<TwoPhaseUpdateOperation> set_up_2_inconsistent_replicas_and_start_update(bool enable_3phase = true) { - setup_stripe(2, 2, "storage:2 distributor:1"); + void do_update_fails_if_cancelled_prior_to_safe_path_metadata_get_completion(bool in_sync_replicas); + + void enable_3phase_updates(bool enable = true) { auto cfg = make_config(); - cfg->set_enable_metadata_only_fetch_phase_for_inconsistent_updates(enable_3phase); + cfg->set_enable_metadata_only_fetch_phase_for_inconsistent_updates(enable); configure_stripe(cfg); + } + + std::shared_ptr<TwoPhaseUpdateOperation> set_up_2_inconsistent_replicas_and_start_update(bool enable_3phase = true) { + setup_stripe(2, 2, "storage:2 distributor:1"); + enable_3phase_updates(enable_3phase); auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas. cb->start(_sender); return cb; @@ -199,13 +207,13 @@ TwoPhaseUpdateOperationTest::replyToPut( { std::shared_ptr<api::StorageMessage> msg2 = sender.command(index); auto& putc = dynamic_cast<PutCommand&>(*msg2); - std::unique_ptr<api::StorageReply> reply(putc.makeReply()); + std::shared_ptr<api::StorageReply> reply(putc.makeReply()); reply->setResult(api::ReturnCode(result, "")); + dynamic_cast<api::PutReply&>(*reply).setBucketInfo(api::BucketInfo(1,2,3,4,5)); if (!traceMsg.empty()) { MBUS_TRACE(reply->getTrace(), 1, traceMsg); } - callback.receive(sender, - std::shared_ptr<StorageReply>(reply.release())); + callback.receive(sender, reply); } void @@ -217,10 +225,9 @@ TwoPhaseUpdateOperationTest::replyToCreateBucket( { std::shared_ptr<api::StorageMessage> msg2 = sender.command(index); auto& putc = dynamic_cast<CreateBucketCommand&>(*msg2); - std::unique_ptr<api::StorageReply> reply(putc.makeReply()); + std::shared_ptr<api::StorageReply> reply(putc.makeReply()); reply->setResult(api::ReturnCode(result, "")); - callback.receive(sender, - std::shared_ptr<StorageReply>(reply.release())); + callback.receive(sender, reply); } void @@ -312,6 +319,7 @@ TwoPhaseUpdateOperationTest::sendUpdate(const std::string& bucketState, update->setCreateIfNonExistent(options._createIfNonExistent); document::BucketId id = operation_context().make_split_bit_constrained_bucket_id(update->getId()); + assert(id == _bucket_id); document::BucketId id2 = document::BucketId(id.getUsedBits() + 1, id.getRawId()); if (bucketState.length()) { @@ -554,6 +562,33 @@ TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_inconsiste _sender.getLastReply(true)); } +TEST_F(TwoPhaseUpdateOperationTest, fast_path_cancellation_transitively_cancels_nested_update_operation) { + setup_stripe(2, 2, "storage:2 distributor:1"); + enable_3phase_updates(); + auto op = sendUpdate("0=1/2/3,1=1/2/3"); + op->start(_sender); + + ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true)); + + replyToMessage(*op, _sender, 0, 110); + + operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(_bucket_id), {1}); + op->cancel(_sender, CancelScope::of_node_subset({1})); + + replyToMessage(*op, _sender, 1, 110); + + // Client operation itself should return success since the update went through on all replica nodes + EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, " + "BucketId(0x0000000000000000), " + "timestamp 0, timestamp of updated doc: 110) " + "ReturnCode(NONE)", + _sender.getLastReply(true)); + + EXPECT_EQ("BucketId(0x400000000000cac4) : " + "node(idx=0,crc=0x123,docs=1/1,bytes=100/100,trusted=true,active=false,ready=false)", + dumpBucket(_bucket_id)); +} + void TwoPhaseUpdateOperationTest::checkMessageSettingsPropagatedTo( const api::StorageCommand::SP& msg) const @@ -713,6 +748,38 @@ TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_safe_path_gets_fail) { EXPECT_EQ(metrics().updates.failures.storagefailure.getValue(), 1); } +void TwoPhaseUpdateOperationTest::do_update_fails_if_cancelled_prior_to_safe_path_metadata_get_completion(bool in_sync_replicas) { + setup_stripe(2, 2, "storage:2 distributor:1"); + enable_3phase_updates(); + auto op = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().createIfNonExistent(true)); + op->start(_sender); + + ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true)); + replyToGet(*op, _sender, 0, 70); + + operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(_bucket_id), {1}); + op->cancel(_sender, CancelScope::of_node_subset({1})); + + replyToGet(*op, _sender, 1, in_sync_replicas ? 70 : 80); + + EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, " + "BucketId(0x0000000000000000), " + "timestamp 0, timestamp of updated doc: 0) " + "ReturnCode(BUCKET_NOT_FOUND, The update operation was cancelled due to a cluster " + "state change between executing the read and write phases of a write-repair update)", + _sender.getLastReply(true)); + + // TODO custom cancellation failure metric? +} + +TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_cancelled_prior_to_safe_path_metadata_get_completion_consistent_case) { + do_update_fails_if_cancelled_prior_to_safe_path_metadata_get_completion(true); +} + +TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_cancelled_prior_to_safe_path_metadata_get_completion_inconsistent_case) { + do_update_fails_if_cancelled_prior_to_safe_path_metadata_get_completion(false); +} + TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_apply_throws_exception) { setup_stripe(2, 2, "storage:2 distributor:1"); // Create update for wrong doctype which will fail the update. @@ -1214,6 +1281,59 @@ TEST_F(ThreePhaseUpdateTest, puts_are_sent_after_receiving_full_document_get) { EXPECT_EQ(1, m.ok.getValue()); } +TEST_F(ThreePhaseUpdateTest, update_fails_if_cancelled_between_metadata_gets_and_full_get) { + auto op = set_up_2_inconsistent_replicas_and_start_update(); + ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true)); + reply_to_metadata_get(*op, _sender, 0, 2000); + reply_to_metadata_get(*op, _sender, 1, 1000); + ASSERT_EQ("Get => 0", _sender.getCommands(true, false, 2)); + + operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(_bucket_id), {1}); + op->cancel(_sender, CancelScope::of_node_subset({1})); + + replyToGet(*op, _sender, 2, 2000U); + ASSERT_EQ("", _sender.getCommands(true, false, 3)); // No puts sent + + EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, " + "BucketId(0x0000000000000000), " + "timestamp 0, timestamp of updated doc: 0) " + "ReturnCode(BUCKET_NOT_FOUND, The update operation was cancelled due to a cluster " + "state change between executing the read and write phases of a write-repair update)", + _sender.getLastReply(true)); + + // TODO cancellation metrics? +} + +TEST_F(ThreePhaseUpdateTest, fast_path_cancellation_transitively_cancels_nested_put_operation) { + auto op = set_up_2_inconsistent_replicas_and_start_update(); + ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true)); + reply_to_metadata_get(*op, _sender, 0, 2000); + reply_to_metadata_get(*op, _sender, 1, 1000); + + ASSERT_EQ("Get => 0", _sender.getCommands(true, false, 2)); + replyToGet(*op, _sender, 2, 2000U); + + operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(_bucket_id), {0}); + op->cancel(_sender, CancelScope::of_node_subset({0})); + + ASSERT_EQ("Put => 1,Put => 0", _sender.getCommands(true, false, 3)); + replyToPut(*op, _sender, 3); + replyToPut(*op, _sender, 4); + + // Update itself is ACKed + EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, " + "BucketId(0x0000000000000000), " + "timestamp 0, timestamp of updated doc: 2000) " + "ReturnCode(NONE)", + _sender.getLastReply(true)); + + // But cancelled replicas are not reintroduced into the bucket DB + EXPECT_EQ("BucketId(0x400000000000cac4) : " + "node(idx=1,crc=0x1,docs=2/4,bytes=3/5,trusted=true,active=false,ready=false)", + dumpBucket(_bucket_id)); +} + + TEST_F(ThreePhaseUpdateTest, consistent_meta_get_timestamps_can_restart_in_fast_path) { auto cb = set_up_2_inconsistent_replicas_and_start_update(); ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true)); @@ -1277,8 +1397,7 @@ TEST_F(ThreePhaseUpdateTest, update_failed_with_transient_error_code_if_replica_ auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // 2 replicas, room for 1 more. cb->start(_sender); // Add new replica to deterministic test bucket after gets have been sent - BucketId bucket(0x400000000000cac4); // Always the same in the test. - addNodesToBucketDB(bucket, "0=1/2/3,1=2/3/4,2=3/3/3"); + addNodesToBucketDB(_bucket_id, "0=1/2/3,1=2/3/4,2=3/3/3"); Timestamp old_timestamp = 500; ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true)); diff --git a/storage/src/tests/distributor/updateoperationtest.cpp b/storage/src/tests/distributor/updateoperationtest.cpp index d0ae31b9524..fefc88a27c2 100644 --- a/storage/src/tests/distributor/updateoperationtest.cpp +++ b/storage/src/tests/distributor/updateoperationtest.cpp @@ -11,7 +11,7 @@ #include <vespa/storageapi/message/bucket.h> #include <vespa/storageapi/message/persistence.h> #include <vespa/storageapi/message/state.h> -#include <vespa/vespalib/gtest/gtest.h> +#include <gtest/gtest.h> using config::ConfigGetter; using config::FileSpec; @@ -29,11 +29,14 @@ struct UpdateOperationTest : Test, DistributorStripeTestUtil { std::shared_ptr<const DocumentTypeRepo> _repo; const DocumentType* _html_type; + UpdateOperationTest() + : _repo(std::make_shared<DocumentTypeRepo>(*ConfigGetter<DocumenttypesConfig>:: + getConfig("config-doctypes", FileSpec("../config-doctypes.cfg")))), + _html_type(_repo->getDocumentType("text/html")) + { + } + void SetUp() override { - _repo.reset( - new DocumentTypeRepo(*ConfigGetter<DocumenttypesConfig>:: - getConfig("config-doctypes", FileSpec("../config-doctypes.cfg")))); - _html_type = _repo->getDocumentType("text/html"); createLinks(); } @@ -241,4 +244,31 @@ TEST_F(UpdateOperationTest, inconsistent_create_if_missing_updates_picks_largest EXPECT_EQ(2, m.diverging_timestamp_updates.getValue()); } +// Note: we don't exhaustively test cancellation edges here, as we assume that Put/Update/Remove ops +// share the same underlying PersistenceMessageTracker logic. See PutOperationTest for more tests. + +TEST_F(UpdateOperationTest, cancelled_nodes_are_not_updated_in_db) { + setup_stripe(Redundancy(3), NodeCount(3), "distributor:1 storage:3"); + + std::shared_ptr<UpdateOperation> op = sendUpdate("0=1/2/3,1=1/2/3,2=1/2/3"); + DistributorMessageSenderStub sender; + op->start(sender); + + ASSERT_EQ("Update => 0,Update => 1,Update => 2", sender.getCommands(true)); + + // Simulate nodes 0 and 2 going down + operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(_bId), {0, 2}); + // Cancelling shall be cumulative + op->cancel(_sender, CancelScope::of_node_subset({0})); + op->cancel(_sender, CancelScope::of_node_subset({2})); + + replyToMessage(*op, sender, 0, 120); + replyToMessage(*op, sender, 1, 120); + replyToMessage(*op, sender, 2, 120); + + EXPECT_EQ("BucketId(0x400000000000cac4) : " + "node(idx=1,crc=0x2,docs=4/4,bytes=6/6,trusted=true,active=false,ready=false)", + dumpBucket(_bId)); +} + } diff --git a/storage/src/vespa/storage/bucketdb/bucketinfo.cpp b/storage/src/vespa/storage/bucketdb/bucketinfo.cpp index a8c21efa793..d2ff7b53403 100644 --- a/storage/src/vespa/storage/bucketdb/bucketinfo.cpp +++ b/storage/src/vespa/storage/bucketdb/bucketinfo.cpp @@ -142,7 +142,7 @@ BucketInfo::addNode(const BucketCopy& newCopy, const std::vector<uint16_t>& reco bool BucketInfo::removeNode(unsigned short node, TrustedUpdate update) { - for (auto iter = _nodes.begin(); iter != _nodes.end(); iter++) { + for (auto iter = _nodes.begin(); iter != _nodes.end(); ++iter) { if (iter->getNode() == node) { _nodes.erase(iter); if (update == TrustedUpdate::UPDATE) { diff --git a/storage/src/vespa/storage/distributor/CMakeLists.txt b/storage/src/vespa/storage/distributor/CMakeLists.txt index 184fee5d2c9..c889afcc77c 100644 --- a/storage/src/vespa/storage/distributor/CMakeLists.txt +++ b/storage/src/vespa/storage/distributor/CMakeLists.txt @@ -10,6 +10,7 @@ vespa_add_library(storage_distributor OBJECT bucket_spaces_stats_provider.cpp bucketgctimecalculator.cpp bucketlistmerger.cpp + cancelled_replicas_pruner.cpp clusterinformation.cpp crypto_uuid_generator.cpp distributor_bucket_space.cpp diff --git a/storage/src/vespa/storage/distributor/cancelled_replicas_pruner.cpp b/storage/src/vespa/storage/distributor/cancelled_replicas_pruner.cpp new file mode 100644 index 00000000000..f453a722d2c --- /dev/null +++ b/storage/src/vespa/storage/distributor/cancelled_replicas_pruner.cpp @@ -0,0 +1,22 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "cancelled_replicas_pruner.h" + +namespace storage::distributor { + +std::vector<BucketCopy> prune_cancelled_nodes(std::span<const BucketCopy> replicas, const CancelScope& cancel_scope) { + if (cancel_scope.fully_cancelled()) { + return {}; + } + std::vector<BucketCopy> pruned_replicas; + // Expect that there will be an input entry for each cancelled node in the common case. + pruned_replicas.reserve((replicas.size() >= cancel_scope.cancelled_nodes().size()) + ? replicas.size() - cancel_scope.cancelled_nodes().size() : 0); + for (auto& candidate : replicas) { + if (!cancel_scope.node_is_cancelled(candidate.getNode())) { + pruned_replicas.emplace_back(candidate); + } + } + return pruned_replicas; +} + +} diff --git a/storage/src/vespa/storage/distributor/cancelled_replicas_pruner.h b/storage/src/vespa/storage/distributor/cancelled_replicas_pruner.h new file mode 100644 index 00000000000..f12f78e569f --- /dev/null +++ b/storage/src/vespa/storage/distributor/cancelled_replicas_pruner.h @@ -0,0 +1,17 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/storage/bucketdb/bucketcopy.h> +#include <vespa/storage/distributor/operations/cancel_scope.h> +#include <span> +#include <vector> + +namespace storage::distributor { + +/** + * Returns a new vector that contains all entries of `replicas` whose nodes are _not_ tagged as + * cancelled in `cancel_scope`. Returned entry ordering is identical to input ordering. + */ +[[nodiscard]] std::vector<BucketCopy> prune_cancelled_nodes(std::span<const BucketCopy> replicas, const CancelScope& cancel_scope); + +} diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp index 243b3c5ecb2..ad1cce46bea 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp +++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp @@ -281,6 +281,7 @@ DistributorStripe::enableClusterStateBundle(const lib::ClusterStateBundle& state enterRecoveryMode(); // Clear all active messages on nodes that are down. + // TODO this should also be done on nodes that are no longer part of the config! const uint16_t old_node_count = oldState.getBaselineClusterState()->getNodeCount(lib::NodeType::STORAGE); const uint16_t new_node_count = baseline_state.getNodeCount(lib::NodeType::STORAGE); for (uint16_t i = 0; i < std::max(old_node_count, new_node_count); ++i) { diff --git a/storage/src/vespa/storage/distributor/operationowner.cpp b/storage/src/vespa/storage/distributor/operationowner.cpp index 7b7c9f431f7..c92544c8cb5 100644 --- a/storage/src/vespa/storage/distributor/operationowner.cpp +++ b/storage/src/vespa/storage/distributor/operationowner.cpp @@ -73,7 +73,7 @@ OperationOwner::onClose() void OperationOwner::erase(api::StorageMessage::Id msgId) { - _sentMessageMap.pop(msgId); + (void)_sentMessageMap.pop(msgId); } } diff --git a/storage/src/vespa/storage/distributor/operations/CMakeLists.txt b/storage/src/vespa/storage/distributor/operations/CMakeLists.txt index 5c6a1f3d84c..8cf0470f674 100644 --- a/storage/src/vespa/storage/distributor/operations/CMakeLists.txt +++ b/storage/src/vespa/storage/distributor/operations/CMakeLists.txt @@ -1,6 +1,7 @@ # Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. vespa_add_library(storage_distributoroperation OBJECT SOURCES + cancel_scope.cpp operation.cpp DEPENDS ) diff --git a/storage/src/vespa/storage/distributor/operations/cancel_scope.cpp b/storage/src/vespa/storage/distributor/operations/cancel_scope.cpp new file mode 100644 index 00000000000..af62b369517 --- /dev/null +++ b/storage/src/vespa/storage/distributor/operations/cancel_scope.cpp @@ -0,0 +1,52 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "cancel_scope.h" + +namespace storage::distributor { + +CancelScope::CancelScope() + : _cancelled_nodes(), + _fully_cancelled(false) +{ +} + +CancelScope::CancelScope(fully_cancelled_ctor_tag) noexcept + : _cancelled_nodes(), + _fully_cancelled(true) +{ +} + +CancelScope::CancelScope(CancelledNodeSet nodes) noexcept + : _cancelled_nodes(std::move(nodes)), + _fully_cancelled(false) +{ +} + +CancelScope::~CancelScope() = default; + +CancelScope::CancelScope(const CancelScope&) = default; +CancelScope& CancelScope::operator=(const CancelScope&) = default; + +CancelScope::CancelScope(CancelScope&&) noexcept = default; +CancelScope& CancelScope::operator=(CancelScope&&) noexcept = default; + +void CancelScope::add_cancelled_node(uint16_t node) { + _cancelled_nodes.insert(node); +} + +void CancelScope::merge(const CancelScope& other) { + _fully_cancelled |= other._fully_cancelled; + // Not using iterator insert(first, last) since that explicitly resizes, + for (uint16_t node : other._cancelled_nodes) { + _cancelled_nodes.insert(node); + } +} + +CancelScope CancelScope::of_fully_cancelled() noexcept { + return CancelScope(fully_cancelled_ctor_tag{}); +} + +CancelScope CancelScope::of_node_subset(CancelledNodeSet nodes) noexcept { + return CancelScope(std::move(nodes)); +} + +} diff --git a/storage/src/vespa/storage/distributor/operations/cancel_scope.h b/storage/src/vespa/storage/distributor/operations/cancel_scope.h new file mode 100644 index 00000000000..7619a64d39f --- /dev/null +++ b/storage/src/vespa/storage/distributor/operations/cancel_scope.h @@ -0,0 +1,62 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/vespalib/stllike/hash_set.h> + +namespace storage::distributor { + +/** + * In the face of concurrent cluster state changes, cluster topology reconfigurations etc., + * it's possible for there to be pending mutating operations to nodes that the distributor + * no longer should keep track of. Such operations must therefore be _cancelled_, either + * fully or partially. A CancelScope represents the granularity at which an operation should + * be cancelled. + * + * In the case of one or more nodes becoming unavailable, `fully_cancelled()` will be false + * and `node_is_cancelled(x)` will return whether node `x` is explicitly cancelled. + * + * In the case of ownership transfers, `fully_cancelled()` will be true since the distributor + * should no longer have any knowledge of the bucket. `node_is_cancelled(x)` is always + * implicitly true for all values of `x` for full cancellations. + */ +class CancelScope { +public: + using CancelledNodeSet = vespalib::hash_set<uint16_t>; +private: + CancelledNodeSet _cancelled_nodes; + bool _fully_cancelled; + + struct fully_cancelled_ctor_tag {}; + + explicit CancelScope(fully_cancelled_ctor_tag) noexcept; + explicit CancelScope(CancelledNodeSet nodes) noexcept; +public: + CancelScope(); + ~CancelScope(); + + CancelScope(const CancelScope&); + CancelScope& operator=(const CancelScope&); + + CancelScope(CancelScope&&) noexcept; + CancelScope& operator=(CancelScope&&) noexcept; + + void add_cancelled_node(uint16_t node); + void merge(const CancelScope& other); + + [[nodiscard]] bool fully_cancelled() const noexcept { return _fully_cancelled; } + [[nodiscard]] bool is_cancelled() const noexcept { + return (_fully_cancelled || !_cancelled_nodes.empty()); + } + [[nodiscard]] bool node_is_cancelled(uint16_t node) const noexcept { + return (fully_cancelled() || _cancelled_nodes.contains(node)); + } + + [[nodiscard]] const CancelledNodeSet& cancelled_nodes() const noexcept { + return _cancelled_nodes; + } + + static CancelScope of_fully_cancelled() noexcept; + static CancelScope of_node_subset(CancelledNodeSet nodes) noexcept; +}; + +} diff --git a/storage/src/vespa/storage/distributor/operations/external/check_condition.cpp b/storage/src/vespa/storage/distributor/operations/external/check_condition.cpp index 0e12e3e3019..bd7f3709575 100644 --- a/storage/src/vespa/storage/distributor/operations/external/check_condition.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/check_condition.cpp @@ -104,7 +104,12 @@ CheckCondition::handle_reply(DistributorStripeMessageSender& sender, } } -void CheckCondition::cancel(DistributorStripeMessageSender& sender) { +void CheckCondition::cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) { + IntermediateMessageSender proxy_sender(_sent_message_map, _cond_get_op, sender); + _cond_get_op->cancel(proxy_sender, cancel_scope); +} + +void CheckCondition::close(DistributorStripeMessageSender& sender) { IntermediateMessageSender proxy_sender(_sent_message_map, _cond_get_op, sender); _cond_get_op->onClose(proxy_sender); // We don't propagate any generated reply from the GetOperation, as its existence @@ -163,6 +168,12 @@ void CheckCondition::handle_internal_get_operation_reply(std::shared_ptr<api::St reply->steal_trace()); return; } + if (_cond_get_op->is_cancelled()) { + _outcome.emplace(api::ReturnCode(api::ReturnCode::ABORTED, + "Operation has been cancelled (likely due to a cluster state change)"), + reply->steal_trace()); + return; + } auto state_version_now = _bucket_space.getClusterState().getVersion(); if (_bucket_space.has_pending_cluster_state()) { state_version_now = _bucket_space.get_pending_cluster_state().getVersion(); diff --git a/storage/src/vespa/storage/distributor/operations/external/check_condition.h b/storage/src/vespa/storage/distributor/operations/external/check_condition.h index 999b79adc3d..92a8bc62ae6 100644 --- a/storage/src/vespa/storage/distributor/operations/external/check_condition.h +++ b/storage/src/vespa/storage/distributor/operations/external/check_condition.h @@ -17,6 +17,7 @@ namespace storage::api { class StorageReply; } namespace storage::distributor { +class CancelScope; class DistributorBucketSpace; class DistributorNodeContext; class DistributorStripeMessageSender; @@ -122,7 +123,8 @@ public: void start_and_send(DistributorStripeMessageSender& sender); void handle_reply(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply>& reply); - void cancel(DistributorStripeMessageSender& sender); + void cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope); + void close(DistributorStripeMessageSender& sender); [[nodiscard]] std::optional<Outcome>& maybe_outcome() noexcept { return _outcome; diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp index 854e7d15f82..e7832fd19e5 100644 --- a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp @@ -109,6 +109,8 @@ PutOperation::sendPutToBucketOnNode(document::BucketSpace bucketSpace, const doc bool PutOperation::has_unavailable_targets_in_pending_state(const OperationTargetList& targets) const { // TODO handle this explicitly as part of operation abort/cancel edge + // -> we have yet to send anything at this point + // -> shouldn't ExternalOperationHandler deal with this before starting the op? auto* pending_state = _op_ctx.pending_cluster_state_or_null(_msg->getBucket().getBucketSpace()); if (!pending_state) { return false; @@ -245,6 +247,15 @@ void PutOperation::start_direct_put_dispatch(DistributorStripeMessageSender& sen _msg = std::shared_ptr<api::PutCommand>(); } +void +PutOperation::on_cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) +{ + if (_check_condition) { + _check_condition->cancel(sender, cancel_scope); + } + _tracker.cancel(cancel_scope); +} + bool PutOperation::shouldImplicitlyActivateReplica(const OperationTargetList& targets) const { @@ -302,7 +313,7 @@ void PutOperation::onClose(DistributorStripeMessageSender& sender) { if (_check_condition) { - _check_condition->cancel(sender); + _check_condition->close(sender); } _tracker.fail(sender, api::ReturnCode(api::ReturnCode::ABORTED, "Process is shutting down")); } diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.h b/storage/src/vespa/storage/distributor/operations/external/putoperation.h index 635accc1865..8b8e3e15375 100644 --- a/storage/src/vespa/storage/distributor/operations/external/putoperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.h @@ -60,6 +60,8 @@ private: void sendPutToBucketOnNode(document::BucketSpace bucketSpace, const document::BucketId& bucketId, uint16_t node, std::vector<PersistenceMessageTracker::ToSend>& putBatch); + void on_cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) override; + [[nodiscard]] bool shouldImplicitlyActivateReplica(const OperationTargetList& targets) const; [[nodiscard]] bool has_unavailable_targets_in_pending_state(const OperationTargetList& targets) const; diff --git a/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp index 42d8e318f47..be43aac3d9e 100644 --- a/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp @@ -156,9 +156,21 @@ void RemoveOperation::on_completed_check_condition(CheckCondition::Outcome& outc void RemoveOperation::onClose(DistributorStripeMessageSender& sender) { + if (_check_condition) { + _check_condition->close(sender); + } _tracker.fail(sender, api::ReturnCode(api::ReturnCode::ABORTED, "Process is shutting down")); } +void +RemoveOperation::on_cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) +{ + if (_check_condition) { + _check_condition->cancel(sender, cancel_scope); + } + _tracker.cancel(cancel_scope); +} + bool RemoveOperation::has_condition() const noexcept { return _msg->hasTestAndSetCondition(); } diff --git a/storage/src/vespa/storage/distributor/operations/external/removeoperation.h b/storage/src/vespa/storage/distributor/operations/external/removeoperation.h index 9f3a98294ea..221def81fdc 100644 --- a/storage/src/vespa/storage/distributor/operations/external/removeoperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/removeoperation.h @@ -29,6 +29,7 @@ public: void onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply> &) override; void onClose(DistributorStripeMessageSender& sender) override; + void on_cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) override; private: PersistenceMessageTrackerImpl _tracker_instance; diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp index 73c65f54b21..2d1c469d072 100644 --- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp @@ -13,6 +13,7 @@ #include <vespa/storage/distributor/distributor_bucket_space_repo.h> #include <vespa/storageapi/message/persistence.h> #include <vespa/vdslib/state/cluster_state_bundle.h> +#include <vespa/vespalib/stllike/hash_set.hpp> #include <cinttypes> #include <vespa/log/log.h> @@ -68,10 +69,8 @@ TwoPhaseUpdateOperation::stateToString(SendState state) noexcept case SendState::SINGLE_GET_SENT: return "SINGLE_GET_SENT"; case SendState::FULL_GETS_SENT: return "FULL_GETS_SENT"; case SendState::PUTS_SENT: return "PUTS_SENT"; - default: - assert(!"Unknown state"); - return ""; } + abort(); } void @@ -130,7 +129,7 @@ TwoPhaseUpdateOperation::get_bucket_database_entries() const } bool -TwoPhaseUpdateOperation::isFastPathPossible(const std::vector<BucketDatabase::Entry>& entries) const +TwoPhaseUpdateOperation::isFastPathPossible(const std::vector<BucketDatabase::Entry>& entries) { // Fast path iff bucket exists AND is consistent (split and copies). if (entries.size() != 1) { @@ -245,6 +244,16 @@ TwoPhaseUpdateOperation::sendLostOwnershipTransientErrorReply(DistributorStripeM } void +TwoPhaseUpdateOperation::send_operation_cancelled_reply(DistributorStripeMessageSender& sender) +{ + sendReplyWithResult(sender, + api::ReturnCode(api::ReturnCode::BUCKET_NOT_FOUND, + "The update operation was cancelled due to a cluster state change " + "between executing the read and write phases of a write-repair " + "update")); +} + +void TwoPhaseUpdateOperation::send_feed_blocked_error_reply(DistributorStripeMessageSender& sender) { sendReplyWithResult(sender, @@ -257,7 +266,8 @@ void TwoPhaseUpdateOperation::schedulePutsWithUpdatedDocument(std::shared_ptr<document::Document> doc, api::Timestamp putTimestamp, DistributorStripeMessageSender& sender) { - if (lostBucketOwnershipBetweenPhases()) { + assert(!is_cancelled()); + if (lostBucketOwnershipBetweenPhases()) { // TODO deprecate with cancellation sendLostOwnershipTransientErrorReply(sender); return; } @@ -281,6 +291,8 @@ TwoPhaseUpdateOperation::schedulePutsWithUpdatedDocument(std::shared_ptr<documen void TwoPhaseUpdateOperation::onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply>& msg) { + // In the case of cancellations, we let existing operations complete, but must not + // start new ones that are unaware of the cancellations. if (_mode == Mode::FAST_PATH) { handleFastPathReceive(sender, msg); } else { @@ -304,7 +316,10 @@ TwoPhaseUpdateOperation::handleFastPathReceive(DistributorStripeMessageSender& s sendReplyWithResult(sender, getReply.getResult()); return; } - + if (is_cancelled()) { + send_operation_cancelled_reply(sender); + return; + } if (!getReply.getDocument().get()) { // Weird, document is no longer there ... Just fail. sendReplyWithResult(sender, api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, "")); @@ -316,7 +331,7 @@ TwoPhaseUpdateOperation::handleFastPathReceive(DistributorStripeMessageSender& s std::shared_ptr<Operation> callback = _sentMessageMap.pop(msg->getMsgId()); assert(callback.get()); - Operation & callbackOp = *callback; + Operation& callbackOp = *callback; IntermediateMessageSender intermediate(_sentMessageMap, std::move(callback), sender); callbackOp.receive(intermediate, msg); @@ -326,12 +341,12 @@ TwoPhaseUpdateOperation::handleFastPathReceive(DistributorStripeMessageSender& s addTraceFromReply(*intermediate._reply); auto& cb = dynamic_cast<UpdateOperation&>(callbackOp); - std::pair<document::BucketId, uint16_t> bestNode = cb.getNewestTimestampLocation(); + auto [newest_bucket, newest_node] = cb.getNewestTimestampLocation(); auto intermediate_update_reply = std::dynamic_pointer_cast<api::UpdateReply>(intermediate._reply); assert(intermediate_update_reply); if (!intermediate_update_reply->getResult().success() || - bestNode.first == document::BucketId(0)) + (newest_bucket == document::BucketId(0))) { if (intermediate_update_reply->getResult().success() && (intermediate_update_reply->getOldTimestamp() == 0)) @@ -343,9 +358,14 @@ TwoPhaseUpdateOperation::handleFastPathReceive(DistributorStripeMessageSender& s } else { LOG(debug, "Update(%s) fast path: was inconsistent!", update_doc_id().c_str()); + if (is_cancelled()) { + send_operation_cancelled_reply(sender); + return; + } + _updateReply = std::move(intermediate_update_reply); - _fast_path_repair_source_node = bestNode.second; - document::Bucket bucket(_updateCmd->getBucket().getBucketSpace(), bestNode.first); + _fast_path_repair_source_node = newest_node; + document::Bucket bucket(_updateCmd->getBucket().getBucketSpace(), newest_bucket); auto cmd = std::make_shared<api::GetCommand>(bucket, _updateCmd->getDocumentId(), document::AllFields::NAME); copyMessageSettings(*_updateCmd, *cmd); @@ -383,7 +403,7 @@ TwoPhaseUpdateOperation::handleSafePathReceive(DistributorStripeMessageSender& s callbackOp.receive(intermediate, msg); if (!intermediate._reply.get()) { - return; // Not enough replies received yet or we're draining callbacks. + return; // Not enough replies received yet, or we're draining callbacks. } addTraceFromReply(*intermediate._reply); if (_sendState == SendState::METADATA_GETS_SENT) { @@ -445,6 +465,13 @@ void TwoPhaseUpdateOperation::handle_safe_path_received_metadata_get( "One or more metadata Get operations failed; aborting Update")); return; } + if (is_cancelled()) { + send_operation_cancelled_reply(sender); + return; + } + // Replicas _removed_ is handled by cancellation, but a concurrent state change may happen + // that _adds_ one or more available content nodes, which we cannot then blindly write to. + // So we have to explicitly check this edge case. if (!replica_set_unchanged_after_get_operation()) { // Use BUCKET_NOT_FOUND to trigger a silent retry. LOG(debug, "Update(%s): replica set has changed after metadata get phase", update_doc_id().c_str()); @@ -490,6 +517,10 @@ TwoPhaseUpdateOperation::handleSafePathReceivedGet(DistributorStripeMessageSende sendReplyWithResult(sender, reply.getResult()); return; } + if (is_cancelled()) { + send_operation_cancelled_reply(sender); + return; + } // Single Get could technically be considered consistent with itself, so make // sure we never treat that as sufficient for restarting in the fast path. if ((_sendState != SendState::SINGLE_GET_SENT) && may_restart_with_fast_path(reply)) { @@ -558,7 +589,8 @@ bool TwoPhaseUpdateOperation::replica_set_unchanged_after_get_operation() const void TwoPhaseUpdateOperation::restart_with_fast_path_due_to_consistent_get_timestamps(DistributorStripeMessageSender& sender) { LOG(debug, "Update(%s): all Gets returned in initial safe path were consistent, restarting in fast path mode", update_doc_id().c_str()); - if (lostBucketOwnershipBetweenPhases()) { + assert(!is_cancelled()); + if (lostBucketOwnershipBetweenPhases()) { // TODO remove once cancellation is wired sendLostOwnershipTransientErrorReply(sender); return; } @@ -579,7 +611,7 @@ TwoPhaseUpdateOperation::processAndMatchTasCondition(DistributorStripeMessageSen std::unique_ptr<document::select::Node> selection; try { - selection = _parser.parse_selection(_updateCmd->getCondition().getSelection()); + selection = _parser.parse_selection(_updateCmd->getCondition().getSelection()); } catch (const document::select::ParsingFailedException & e) { sendReplyWithResult(sender, api::ReturnCode( api::ReturnCode::ILLEGAL_PARAMETERS, @@ -679,6 +711,22 @@ TwoPhaseUpdateOperation::onClose(DistributorStripeMessageSender& sender) { } } +void +TwoPhaseUpdateOperation::on_cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) { + // We have to explicitly cancel any and all pending Operation instances that have been + // launched by this operation. This is to ensure any DB updates they may transitively + // perform are aware of all cancellations that have occurred. + // There may be many messages pending for any given operation, so unique-ify them prior + // to avoid duplicate cancellation invocations. + vespalib::hash_set<Operation*> ops; + for (auto& msg_op : _sentMessageMap) { + ops.insert(msg_op.second.get()); + } + for (auto* op : ops) { + op->cancel(sender, cancel_scope); + } +} + vespalib::string TwoPhaseUpdateOperation::update_doc_id() const { assert(_updateCmd.get() != nullptr); return _updateCmd->getDocumentId().toString(); diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h index d2ad5359fa6..7f64bb8d56c 100644 --- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h @@ -71,6 +71,8 @@ public: void onClose(DistributorStripeMessageSender& sender) override; + void on_cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) override; + private: enum class SendState { NONE_SENT, @@ -94,19 +96,20 @@ private: void sendReplyWithResult(DistributorStripeMessageSender&, const api::ReturnCode&); void ensureUpdateReplyCreated(); - std::vector<BucketDatabase::Entry> get_bucket_database_entries() const; - bool isFastPathPossible(const std::vector<BucketDatabase::Entry>& entries) const; + [[nodiscard]] std::vector<BucketDatabase::Entry> get_bucket_database_entries() const; + [[nodiscard]] static bool isFastPathPossible(const std::vector<BucketDatabase::Entry>& entries); void startFastPathUpdate(DistributorStripeMessageSender& sender, std::vector<BucketDatabase::Entry> entries); void startSafePathUpdate(DistributorStripeMessageSender&); - bool lostBucketOwnershipBetweenPhases() const; + [[nodiscard]] bool lostBucketOwnershipBetweenPhases() const; void sendLostOwnershipTransientErrorReply(DistributorStripeMessageSender&); + void send_operation_cancelled_reply(DistributorStripeMessageSender& sender); void send_feed_blocked_error_reply(DistributorStripeMessageSender& sender); void schedulePutsWithUpdatedDocument( std::shared_ptr<document::Document>, api::Timestamp, DistributorStripeMessageSender&); void applyUpdateToDocument(document::Document&) const; - std::shared_ptr<document::Document> createBlankDocument() const; + [[nodiscard]] std::shared_ptr<document::Document> createBlankDocument() const; void setUpdatedForTimestamp(api::Timestamp); void handleFastPathReceive(DistributorStripeMessageSender&, const std::shared_ptr<api::StorageReply>&); @@ -120,20 +123,20 @@ private: void handle_safe_path_received_single_full_get(DistributorStripeMessageSender&, api::GetReply&); void handleSafePathReceivedGet(DistributorStripeMessageSender&, api::GetReply&); void handleSafePathReceivedPut(DistributorStripeMessageSender&, const api::PutReply&); - bool shouldCreateIfNonExistent() const; + [[nodiscard]] bool shouldCreateIfNonExistent() const; bool processAndMatchTasCondition( DistributorStripeMessageSender& sender, const document::Document& candidateDoc); - bool satisfiesUpdateTimestampConstraint(api::Timestamp) const; + [[nodiscard]] bool satisfiesUpdateTimestampConstraint(api::Timestamp) const; void addTraceFromReply(api::StorageReply& reply); - bool hasTasCondition() const noexcept; + [[nodiscard]] bool hasTasCondition() const noexcept; void replyWithTasFailure(DistributorStripeMessageSender& sender, vespalib::stringref message); bool may_restart_with_fast_path(const api::GetReply& reply); - bool replica_set_unchanged_after_get_operation() const; + [[nodiscard]] bool replica_set_unchanged_after_get_operation() const; void restart_with_fast_path_due_to_consistent_get_timestamps(DistributorStripeMessageSender& sender); // Precondition: reply has not yet been sent. - vespalib::string update_doc_id() const; + [[nodiscard]] vespalib::string update_doc_id() const; using ReplicaState = std::vector<std::pair<document::BucketId, uint16_t>>; diff --git a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp index f43a6092372..60bddebbb89 100644 --- a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp @@ -207,6 +207,13 @@ UpdateOperation::onClose(DistributorStripeMessageSender& sender) _tracker.fail(sender, api::ReturnCode(api::ReturnCode::ABORTED, "Process is shutting down")); } +void +UpdateOperation::on_cancel(DistributorStripeMessageSender&, const CancelScope& cancel_scope) +{ + _tracker.cancel(cancel_scope); +} + + // The backend behavior of "create-if-missing" updates is to return the timestamp of the // _new_ update operation if the document was created from scratch. The two-phase update // operation logic auto-detects unexpected inconsistencies and tries to reconcile diff --git a/storage/src/vespa/storage/distributor/operations/external/updateoperation.h b/storage/src/vespa/storage/distributor/operations/external/updateoperation.h index 96fd878a324..7d2131d426d 100644 --- a/storage/src/vespa/storage/distributor/operations/external/updateoperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/updateoperation.h @@ -31,6 +31,7 @@ public: std::string getStatus() const override { return ""; }; void onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply> & msg) override; void onClose(DistributorStripeMessageSender& sender) override; + void on_cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) override; std::pair<document::BucketId, uint16_t> getNewestTimestampLocation() const { return _newestTimestampLocation; diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp index 2e6d0e95ec9..bf64fa2eb82 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp @@ -1,6 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "garbagecollectionoperation.h" +#include <vespa/storage/distributor/cancelled_replicas_pruner.h> #include <vespa/storage/distributor/idealstatemanager.h> #include <vespa/storage/distributor/idealstatemetricsset.h> #include <vespa/storage/distributor/top_level_distributor.h> @@ -22,6 +23,7 @@ GarbageCollectionOperation::GarbageCollectionOperation(const ClusterContext& clu _cluster_state_version_at_phase1_start_time(0), _remove_candidates(), _replica_info(), + _cancel_scope(), _max_documents_removed(0), _is_done(false) {} @@ -148,6 +150,10 @@ GarbageCollectionOperation::onReceive(DistributorStripeMessageSender& sender, } } +void GarbageCollectionOperation::on_cancel(DistributorStripeMessageSender&, const CancelScope& cancel_scope) { + _cancel_scope.merge(cancel_scope); +} + void GarbageCollectionOperation::update_replica_response_info_from_reply(uint16_t from_node, const api::RemoveLocationReply& reply) { _replica_info.emplace_back(_manager->operation_context().generate_unique_timestamp(), from_node, reply.getBucketInfo()); @@ -186,6 +192,11 @@ bool GarbageCollectionOperation::may_start_write_phase() const { if (!_ok) { return false; // Already broken, no reason to proceed. } + if (is_cancelled()) { + LOG(debug, "GC(%s): not sending write phase; operation has been explicitly cancelled", + getBucket().toString().c_str()); + return false; + } const auto state_version_now = _bucketSpace->getClusterState().getVersion(); if ((state_version_now != _cluster_state_version_at_phase1_start_time) || _bucketSpace->has_pending_cluster_state()) @@ -250,9 +261,17 @@ void GarbageCollectionOperation::update_last_gc_timestamp_in_db() { } void GarbageCollectionOperation::merge_received_bucket_info_into_db() { - // TODO avoid two separate DB ops for this. Current API currently does not make this elegant. - _manager->operation_context().update_bucket_database(getBucket(), _replica_info); - update_last_gc_timestamp_in_db(); + if (_cancel_scope.is_cancelled()) { + if (_cancel_scope.fully_cancelled()) { + return; + } + _replica_info = prune_cancelled_nodes(_replica_info, _cancel_scope); + } + if (!_replica_info.empty()) { + // TODO avoid two separate DB ops for this. Current API currently does not make this elegant. + _manager->operation_context().update_bucket_database(getBucket(), _replica_info); + update_last_gc_timestamp_in_db(); + } // else: effectively fully cancelled, no touching the DB. } void GarbageCollectionOperation::update_gc_metrics() { diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h index 27dc519dcc2..97efbe694de 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h +++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h @@ -3,10 +3,11 @@ #include "idealstateoperation.h" #include <vespa/document/base/documentid.h> +#include <vespa/persistence/spi/id_and_timestamp.h> #include <vespa/storage/bucketdb/bucketcopy.h> #include <vespa/storage/distributor/messagetracker.h> #include <vespa/storage/distributor/operation_sequencer.h> -#include <vespa/persistence/spi/id_and_timestamp.h> +#include <vespa/storage/distributor/operations/cancel_scope.h> #include <vespa/vespalib/stllike/hash_map.h> #include <vector> @@ -22,13 +23,14 @@ public: void onStart(DistributorStripeMessageSender& sender) override; void onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply> &) override; + void on_cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) override; const char* getName() const noexcept override { return "garbagecollection"; }; Type getType() const noexcept override { return GARBAGE_COLLECTION; } bool shouldBlockThisOperation(uint32_t, uint16_t, uint8_t) const override; - bool is_two_phase() const noexcept { + [[nodiscard]] bool is_two_phase() const noexcept { return ((_phase == Phase::ReadMetadataPhase) || (_phase == Phase::WriteRemovesPhase)); } - bool is_done() const noexcept { return _is_done; } + [[nodiscard]] bool is_done() const noexcept { return _is_done; } protected: MessageTracker _tracker; @@ -54,13 +56,14 @@ private: RemoveCandidates _remove_candidates; std::vector<SequencingHandle> _gc_write_locks; std::vector<BucketCopy> _replica_info; + CancelScope _cancel_scope; uint32_t _max_documents_removed; bool _is_done; static RemoveCandidates steal_selection_matches_as_candidates(api::RemoveLocationReply& reply); void send_current_phase_remove_locations(DistributorStripeMessageSender& sender); - std::vector<spi::IdAndTimestamp> compile_phase_two_send_set() const; + [[nodiscard]] std::vector<spi::IdAndTimestamp> compile_phase_two_send_set() const; void handle_ok_legacy_reply(uint16_t from_node, const api::RemoveLocationReply& reply); void handle_ok_phase1_reply(api::RemoveLocationReply& reply); diff --git a/storage/src/vespa/storage/distributor/operations/operation.cpp b/storage/src/vespa/storage/distributor/operations/operation.cpp index 4d82de170ae..9f944a94178 100644 --- a/storage/src/vespa/storage/distributor/operations/operation.cpp +++ b/storage/src/vespa/storage/distributor/operations/operation.cpp @@ -12,7 +12,8 @@ LOG_SETUP(".distributor.callback"); namespace storage::distributor { Operation::Operation() - : _startTime() + : _startTime(), + _cancelled(false) { } @@ -45,6 +46,11 @@ Operation::copyMessageSettings(const api::StorageCommand& source, api::StorageCo target.setPriority(source.getPriority()); } +void Operation::cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) { + _cancelled = true; + on_cancel(sender, cancel_scope); +} + void Operation::on_blocked() { diff --git a/storage/src/vespa/storage/distributor/operations/operation.h b/storage/src/vespa/storage/distributor/operations/operation.h index bc7e510a5b6..64caacfc642 100644 --- a/storage/src/vespa/storage/distributor/operations/operation.h +++ b/storage/src/vespa/storage/distributor/operations/operation.h @@ -16,6 +16,7 @@ class StorageComponent; namespace distributor { +class CancelScope; class DistributorStripeOperationContext; class PendingMessageTracker; class OperationSequencer; @@ -40,7 +41,7 @@ public: on the owner of the message that was replied to. */ virtual void receive(DistributorStripeMessageSender& sender, - const std::shared_ptr<api::StorageReply> & msg) + const std::shared_ptr<api::StorageReply> & msg) { onReceive(sender, msg); } @@ -60,6 +61,22 @@ public: void start(DistributorStripeMessageSender& sender); /** + * Explicitly cancel the operation. Cancelled operations may or may not (depending on + * the operation implementation) be immediately aborted, but they should either way + * never insert any bucket information _for cancelled nodes_ into the bucket DB after + * cancel() has been called. + */ + void cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope); + + /** + * Whether cancel() has been invoked at least once on this instance. This does not + * distinguish between cancellations caused by ownership transfers and those caused + * by nodes becoming unavailable; Operation implementations that care about this need + * to implement cancel() themselves and inspect the provided CancelScope. + */ + [[nodiscard]] bool is_cancelled() const noexcept { return _cancelled; } + + /** * Returns true if we are blocked to start this operation given * the pending messages. */ @@ -93,8 +110,15 @@ private: const std::shared_ptr<api::StorageReply> & msg) = 0; protected: + virtual void on_cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) { + (void)sender; + (void)cancel_scope; + } + static constexpr vespalib::duration MAX_TIMEOUT = 3600s; + vespalib::system_time _startTime; + bool _cancelled; }; } diff --git a/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp b/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp index a4295613fd2..498f3a5feab 100644 --- a/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp +++ b/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp @@ -1,10 +1,12 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "persistencemessagetracker.h" +#include "cancelled_replicas_pruner.h" #include "distributor_bucket_space_repo.h" #include "distributor_bucket_space.h" #include <vespa/vdslib/distribution/distribution.h> #include <vespa/storageapi/message/persistence.h> +#include <algorithm> #include <vespa/log/log.h> LOG_SETUP(".persistencemessagetracker"); @@ -18,12 +20,15 @@ PersistenceMessageTrackerImpl::PersistenceMessageTrackerImpl( DistributorStripeOperationContext& op_ctx, api::Timestamp revertTimestamp) : MessageTracker(node_ctx), + _remapBucketInfo(), + _bucketInfo(), _metric(metric), _reply(std::move(reply)), _op_ctx(op_ctx), _revertTimestamp(revertTimestamp), _trace(_reply->getTrace().getLevel()), _requestTimer(node_ctx.clock()), + _cancel_scope(), _n_persistence_replies_total(0), _n_successful_persistence_replies(0), _priority(_reply->getPriority()), @@ -34,8 +39,32 @@ PersistenceMessageTrackerImpl::PersistenceMessageTrackerImpl( PersistenceMessageTrackerImpl::~PersistenceMessageTrackerImpl() = default; void +PersistenceMessageTrackerImpl::cancel(const CancelScope& cancel_scope) +{ + _cancel_scope.merge(cancel_scope); +} + +void +PersistenceMessageTrackerImpl::prune_cancelled_nodes_if_present( + BucketInfoMap& bucket_and_replicas, + const CancelScope& cancel_scope) +{ + for (auto& info : bucket_and_replicas) { + info.second = prune_cancelled_nodes(info.second, cancel_scope); + } +} + +void PersistenceMessageTrackerImpl::updateDB() { + if (_cancel_scope.is_cancelled()) { + if (_cancel_scope.fully_cancelled()) { + return; // Fully cancelled ops cannot mutate the DB at all + } + prune_cancelled_nodes_if_present(_bucketInfo, _cancel_scope); + prune_cancelled_nodes_if_present(_remapBucketInfo, _cancel_scope); + } + for (const auto & entry : _bucketInfo) { _op_ctx.update_bucket_database(entry.first, entry.second); } @@ -229,12 +258,19 @@ PersistenceMessageTrackerImpl::updateFailureResult(const api::BucketInfoReply& r _success = false; } +bool +PersistenceMessageTrackerImpl::node_is_effectively_cancelled(uint16_t node) const noexcept +{ + return _cancel_scope.node_is_cancelled(node); // Implicitly covers the fully cancelled case +} + void PersistenceMessageTrackerImpl::handleCreateBucketReply(api::BucketInfoReply& reply, uint16_t node) { LOG(spam, "Received CreateBucket reply for %s from node %u", reply.getBucketId().toString().c_str(), node); if (!reply.getResult().success() - && reply.getResult().getResult() != api::ReturnCode::EXISTS) + && (reply.getResult().getResult() != api::ReturnCode::EXISTS) + && !node_is_effectively_cancelled(node)) { LOG(spam, "Create bucket reply failed, so deleting it from bucket db"); // We don't know if the bucket exists at this point, so we remove it from the DB. diff --git a/storage/src/vespa/storage/distributor/persistencemessagetracker.h b/storage/src/vespa/storage/distributor/persistencemessagetracker.h index 9b06547dd98..8c44d70062c 100644 --- a/storage/src/vespa/storage/distributor/persistencemessagetracker.h +++ b/storage/src/vespa/storage/distributor/persistencemessagetracker.h @@ -4,6 +4,7 @@ #include "distributor_stripe_component.h" #include "distributormetricsset.h" #include "messagetracker.h" +#include <vespa/storage/distributor/operations/cancel_scope.h> #include <vespa/storageframework/generic/clock/timer.h> #include <vespa/storageapi/messageapi/bucketinfocommand.h> #include <vespa/storageapi/messageapi/bucketinforeply.h> @@ -14,6 +15,7 @@ struct PersistenceMessageTracker { virtual ~PersistenceMessageTracker() = default; using ToSend = MessageTracker::ToSend; + virtual void cancel(const CancelScope& cancel_scope) = 0; virtual void fail(MessageSender&, const api::ReturnCode&) = 0; virtual void queueMessageBatch(std::vector<ToSend> messages) = 0; virtual uint16_t receiveReply(MessageSender&, api::BucketInfoReply&) = 0; @@ -26,14 +28,9 @@ struct PersistenceMessageTracker { }; class PersistenceMessageTrackerImpl final - : public PersistenceMessageTracker, - public MessageTracker + : public PersistenceMessageTracker, + public MessageTracker { -private: - using BucketInfoMap = std::map<document::Bucket, std::vector<BucketCopy>>; - BucketInfoMap _remapBucketInfo; - BucketInfoMap _bucketInfo; - public: PersistenceMessageTrackerImpl(PersistenceOperationMetricSet& metric, std::shared_ptr<api::BucketInfoReply> reply, @@ -42,6 +39,8 @@ public: api::Timestamp revertTimestamp = 0); ~PersistenceMessageTrackerImpl() override; + void cancel(const CancelScope& cancel_scope) override; + void updateDB(); void updateMetrics(); [[nodiscard]] bool success() const noexcept { return _success; } @@ -67,8 +66,11 @@ public: void queueMessageBatch(std::vector<MessageTracker::ToSend> messages) override; private: - using MessageBatch = std::vector<uint64_t>; + using MessageBatch = std::vector<uint64_t>; + using BucketInfoMap = std::map<document::Bucket, std::vector<BucketCopy>>; + BucketInfoMap _remapBucketInfo; + BucketInfoMap _bucketInfo; std::vector<MessageBatch> _messageBatches; PersistenceOperationMetricSet& _metric; std::shared_ptr<api::BucketInfoReply> _reply; @@ -77,20 +79,24 @@ private: std::vector<BucketNodePair> _revertNodes; mbus::Trace _trace; framework::MilliSecTimer _requestTimer; + CancelScope _cancel_scope; uint32_t _n_persistence_replies_total; uint32_t _n_successful_persistence_replies; uint8_t _priority; bool _success; - bool canSendReplyEarly() const; + static void prune_cancelled_nodes_if_present(BucketInfoMap& bucket_and_replicas, + const CancelScope& cancel_scope); + [[nodiscard]] bool canSendReplyEarly() const; void addBucketInfoFromReply(uint16_t node, const api::BucketInfoReply& reply); void logSuccessfulReply(uint16_t node, const api::BucketInfoReply& reply) const; - bool hasSentReply() const noexcept { return !_reply; } - bool shouldRevert() const; - bool has_majority_successful_replies() const noexcept; - bool has_minority_test_and_set_failure() const noexcept; + [[nodiscard]] bool hasSentReply() const noexcept { return !_reply; } + [[nodiscard]] bool shouldRevert() const; + [[nodiscard]] bool has_majority_successful_replies() const noexcept; + [[nodiscard]] bool has_minority_test_and_set_failure() const noexcept; void sendReply(MessageSender& sender); void updateFailureResult(const api::BucketInfoReply& reply); + [[nodiscard]] bool node_is_effectively_cancelled(uint16_t node) const noexcept; void handleCreateBucketReply(api::BucketInfoReply& reply, uint16_t node); void handlePersistenceReply(api::BucketInfoReply& reply, uint16_t node); void transfer_trace_state_to_reply(); diff --git a/storage/src/vespa/storage/distributor/sentmessagemap.h b/storage/src/vespa/storage/distributor/sentmessagemap.h index 70bee311f78..951ed6a6877 100644 --- a/storage/src/vespa/storage/distributor/sentmessagemap.h +++ b/storage/src/vespa/storage/distributor/sentmessagemap.h @@ -10,19 +10,23 @@ class Operation; class SentMessageMap { public: + using Map = std::map<api::StorageMessage::Id, std::shared_ptr<Operation>>; + SentMessageMap(); ~SentMessageMap(); - std::shared_ptr<Operation> pop(api::StorageMessage::Id id); - std::shared_ptr<Operation> pop(); + [[nodiscard]] std::shared_ptr<Operation> pop(api::StorageMessage::Id id); + [[nodiscard]] std::shared_ptr<Operation> pop(); void insert(api::StorageMessage::Id id, const std::shared_ptr<Operation> & msg); void clear(); - uint32_t size() const { return _map.size(); } + [[nodiscard]] uint32_t size() const { return _map.size(); } [[nodiscard]] bool empty() const noexcept { return _map.empty(); } std::string toString() const; + + Map::const_iterator begin() const noexcept { return _map.cbegin(); } + Map::const_iterator end() const noexcept { return _map.cend(); } private: - using Map = std::map<api::StorageMessage::Id, std::shared_ptr<Operation>>; Map _map; }; |