diff options
author | Tor Brede Vekterli <vekterli@yahooinc.com> | 2022-08-22 15:00:25 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-08-22 15:00:25 +0200 |
commit | 9afa884388804986f5c22223ab13f1649da9bc68 (patch) | |
tree | eb4d8d51ee1d8919c91cb83fa8cc9062fab91648 /storage | |
parent | 5a9875b516d43bfa367b5f3083f9bc58e7d8f795 (diff) | |
parent | 54693fc154c0fabae6ac82607765a22057977bbb (diff) |
Merge pull request #23701 from vespa-engine/vekterli/two-phase-document-gc
Add support for two-phase document garbage collection [run-systemtest]
Diffstat (limited to 'storage')
29 files changed, 955 insertions, 135 deletions
diff --git a/storage/src/tests/distributor/distributor_message_sender_stub.h b/storage/src/tests/distributor/distributor_message_sender_stub.h index 421fe2216ca..ba552217e07 100644 --- a/storage/src/tests/distributor/distributor_message_sender_stub.h +++ b/storage/src/tests/distributor/distributor_message_sender_stub.h @@ -106,6 +106,11 @@ public: return *_operation_sequencer; } + distributor::OperationSequencer& operation_sequencer() noexcept override { + assert(_operation_sequencer); + return *_operation_sequencer; + } + void set_operation_sequencer(distributor::OperationSequencer& op_seq) { _operation_sequencer = &op_seq; } diff --git a/storage/src/tests/distributor/distributor_stripe_test.cpp b/storage/src/tests/distributor/distributor_stripe_test.cpp index 709f2e6cdc5..91b43ebce16 100644 --- a/storage/src/tests/distributor/distributor_stripe_test.cpp +++ b/storage/src/tests/distributor/distributor_stripe_test.cpp @@ -191,6 +191,12 @@ struct DistributorStripeTest : Test, DistributorStripeTestUtil { configure_stripe(builder); } + void configure_enable_two_phase_garbage_collection(bool use_two_phase) { + ConfigBuilder builder; + builder.enableTwoPhaseGarbageCollection = use_two_phase; + configure_stripe(builder); + } + bool scheduler_has_implicitly_clear_priority_on_schedule_set() const noexcept { return _stripe->_scheduler->implicitly_clear_priority_on_schedule(); } @@ -1012,4 +1018,18 @@ TEST_F(DistributorStripeTest, use_unordered_merge_chaining_config_is_propagated_ EXPECT_FALSE(getConfig().use_unordered_merge_chaining()); } +TEST_F(DistributorStripeTest, enable_two_phase_gc_config_is_propagated_to_internal_config) +{ + setup_stripe(Redundancy(1), NodeCount(1), "distributor:1 storage:1"); + + // Feature is currently disabled by default. TODO change once we roll it out. + EXPECT_FALSE(getConfig().enable_two_phase_garbage_collection()); + + configure_enable_two_phase_garbage_collection(true); + EXPECT_TRUE(getConfig().enable_two_phase_garbage_collection()); + + configure_enable_two_phase_garbage_collection(false); + EXPECT_FALSE(getConfig().enable_two_phase_garbage_collection()); +} + } diff --git a/storage/src/tests/distributor/garbagecollectiontest.cpp b/storage/src/tests/distributor/garbagecollectiontest.cpp index 07ef4604e20..4f9fd25098b 100644 --- a/storage/src/tests/distributor/garbagecollectiontest.cpp +++ b/storage/src/tests/distributor/garbagecollectiontest.cpp @@ -11,33 +11,83 @@ #include <vespa/vespalib/gtest/gtest.h> using document::test::makeDocumentBucket; +using document::BucketId; +using document::DocumentId; +using document::FixedBucketSpaces; using namespace ::testing; namespace storage::distributor { struct GarbageCollectionOperationTest : Test, DistributorStripeTestUtil { + BucketId _bucket_id; + OperationSequencer _operation_sequencer; + uint32_t _gc_start_time_sec; + spi::IdAndTimestamp _e1; + spi::IdAndTimestamp _e2; + spi::IdAndTimestamp _e3; + spi::IdAndTimestamp _e4; + + + GarbageCollectionOperationTest() + : _bucket_id(16, 1), + _operation_sequencer(), + _gc_start_time_sec(34), + _e1(DocumentId("id:foo:bar::doc-1"), spi::Timestamp(100)), + _e2(DocumentId("id:foo:bar::doc-2"), spi::Timestamp(200)), + _e3(DocumentId("id:foo:bar::doc-3"), spi::Timestamp(300)), + _e4(DocumentId("id:foo:bar::doc-4"), spi::Timestamp(400)) + {} + void SetUp() override { createLinks(); - enable_cluster_state("distributor:1 storage:2"); - addNodesToBucketDB(document::BucketId(16, 1), "0=250/50/300,1=250/50/300"); + enable_cluster_state("version:10 distributor:1 storage:2"); + addNodesToBucketDB(_bucket_id, "0=250/50/300,1=250/50/300"); auto cfg = make_config(); cfg->setGarbageCollection("music.date < 34", 3600s); configure_stripe(cfg); - getClock().setAbsoluteTimeInSeconds(34); + getClock().setAbsoluteTimeInSeconds(_gc_start_time_sec); + _sender.set_operation_sequencer(_operation_sequencer); }; void TearDown() override { close(); } + void enable_two_phase_gc() { + NodeSupportedFeatures with_two_phase; + with_two_phase.two_phase_remove_location = true; + set_node_supported_features(0, with_two_phase); + set_node_supported_features(1, with_two_phase); + + config_enable_two_phase_gc(true); + } + + void config_enable_two_phase_gc(bool enabled) { + auto config = make_config(); + config->set_enable_two_phase_garbage_collection(enabled); + configure_stripe(std::move(config)); + } + std::shared_ptr<GarbageCollectionOperation> create_op() { auto op = std::make_shared<GarbageCollectionOperation>( - dummy_cluster_context, BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), + dummy_cluster_context, BucketAndNodes(makeDocumentBucket(_bucket_id), toVector<uint16_t>(0, 1))); op->setIdealStateManager(&getIdealStateManager()); return op; } + static std::shared_ptr<api::RemoveLocationCommand> as_remove_location_command(const std::shared_ptr<api::StorageCommand>& cmd) { + auto msg = std::dynamic_pointer_cast<api::RemoveLocationCommand>(cmd); + assert(msg); + return msg; + } + + static std::shared_ptr<api::RemoveLocationReply> make_remove_location_reply(api::StorageCommand& msg) { + auto reply = std::shared_ptr<api::StorageReply>(msg.makeReply()); + assert(reply->getType() == api::MessageType::REMOVELOCATION_REPLY); + return std::dynamic_pointer_cast<api::RemoveLocationReply>(reply); + } + // FIXME fragile to assume that send order == node index, but that's the way it currently works void reply_to_nth_request(GarbageCollectionOperation& op, size_t n, uint32_t bucket_info_checksum, uint32_t n_docs_removed) { @@ -51,8 +101,14 @@ struct GarbageCollectionOperationTest : Test, DistributorStripeTestUtil { op.receive(_sender, reply); } + void assert_bucket_last_gc_timestamp_is(uint32_t gc_time) { + BucketDatabase::Entry entry = getBucket(_bucket_id); + ASSERT_TRUE(entry.valid()); + EXPECT_EQ(entry->getLastGarbageCollectionTime(), gc_time); + } + void assert_bucket_db_contains(std::vector<api::BucketInfo> info, uint32_t last_gc_time) { - BucketDatabase::Entry entry = getBucket(document::BucketId(16, 1)); + BucketDatabase::Entry entry = getBucket(_bucket_id); ASSERT_TRUE(entry.valid()); ASSERT_EQ(entry->getNodeCount(), info.size()); EXPECT_EQ(entry->getLastGarbageCollectionTime(), last_gc_time); @@ -69,11 +125,21 @@ struct GarbageCollectionOperationTest : Test, DistributorStripeTestUtil { assert(gc_metrics); return gc_metrics->documents_removed.getValue(); } + + void assert_gc_op_completed_ok_without_second_phase(GarbageCollectionOperation& op) { + ASSERT_EQ(0u, _sender.commands().size()); + EXPECT_TRUE(op.is_done()); + EXPECT_TRUE(op.ok()); // It's not a failure to have nothing to do + // GC timestamp must be updated so we can move on to another bucket. + EXPECT_NO_FATAL_FAILURE(assert_bucket_last_gc_timestamp_is(_gc_start_time_sec)); + EXPECT_EQ(0u, gc_removed_documents_metric()); // Nothing removed + } }; -TEST_F(GarbageCollectionOperationTest, simple) { +TEST_F(GarbageCollectionOperationTest, simple_legacy) { auto op = create_op(); op->start(_sender, framework::MilliSecTime(0)); + EXPECT_FALSE(op->is_two_phase()); ASSERT_EQ(2, _sender.commands().size()); EXPECT_EQ(0u, gc_removed_documents_metric()); @@ -119,4 +185,261 @@ TEST_F(GarbageCollectionOperationTest, gc_bucket_info_does_not_overwrite_later_s ASSERT_NO_FATAL_FAILURE(assert_bucket_db_contains({api::BucketInfo(7777, 100, 2000), api::BucketInfo(4567, 90, 500)}, 34)); } +TEST_F(GarbageCollectionOperationTest, two_phase_gc_requires_config_enabling_and_explicit_node_support) { + NodeSupportedFeatures with_two_phase; + with_two_phase.two_phase_remove_location = true; + set_node_supported_features(1, with_two_phase); + + config_enable_two_phase_gc(true); + + // Config enabled, but only 1 node says it supports two-phase RemoveLocation + auto op = create_op(); + op->start(_sender, framework::MilliSecTime(0)); + EXPECT_FALSE(op->is_two_phase()); + + // Node 0 suddenly upgraded...! + set_node_supported_features(0, with_two_phase); + op = create_op(); + op->start(_sender, framework::MilliSecTime(0)); + EXPECT_TRUE(op->is_two_phase()); + + // But doesn't matter if two-phase GC is config-disabled + config_enable_two_phase_gc(false); + + op = create_op(); + op->start(_sender, framework::MilliSecTime(0)); + EXPECT_FALSE(op->is_two_phase()); +} + +TEST_F(GarbageCollectionOperationTest, first_phase_sends_enumerate_only_remove_locations_with_provided_gc_pri) { + enable_two_phase_gc(); + auto op = create_op(); + op->setPriority(getConfig().getMaintenancePriorities().garbageCollection); + op->start(_sender, framework::MilliSecTime(0)); + ASSERT_EQ(2, _sender.commands().size()); + + for (int i : {0, 1}) { + auto cmd = as_remove_location_command(_sender.command(i)); + EXPECT_TRUE(cmd->only_enumerate_docs()); + EXPECT_EQ(cmd->getPriority(), getConfig().getMaintenancePriorities().garbageCollection); + } +} + +TEST_F(GarbageCollectionOperationTest, second_phase_sends_intersection_of_returned_entries_with_feed_pri) { + enable_two_phase_gc(); + auto op = create_op(); + op->start(_sender, framework::MilliSecTime(0)); + ASSERT_EQ(2, _sender.commands().size()); + + auto r1 = make_remove_location_reply(*_sender.command(0)); + r1->set_selection_matches({_e1, _e2, _e3}); + auto r2 = make_remove_location_reply(*_sender.command(1)); + r2->set_selection_matches({_e2, _e3, _e4}); // e2, e3 in common with r1 + + _sender.commands().clear(); + op->receive(_sender, r1); + ASSERT_EQ(0u, _sender.commands().size()); // No phase 2 yet, must get reply from all nodes + op->receive(_sender, r2); + ASSERT_EQ(2u, _sender.commands().size()); // Phase 2 sent + + std::vector<spi::IdAndTimestamp> expected({_e2, _e3}); + for (int i : {0, 1}) { + auto cmd = as_remove_location_command(_sender.command(i)); + EXPECT_FALSE(cmd->only_enumerate_docs()); + EXPECT_EQ(cmd->explicit_remove_set(), expected); + EXPECT_EQ(cmd->getPriority(), getConfig().default_external_feed_priority()); + } +} + +TEST_F(GarbageCollectionOperationTest, no_second_phase_if_first_phase_has_no_results) { + enable_two_phase_gc(); + auto op = create_op(); + op->start(_sender, framework::MilliSecTime(0)); + ASSERT_EQ(2, _sender.commands().size()); + + auto r1 = make_remove_location_reply(*_sender.command(0)); + auto r2 = make_remove_location_reply(*_sender.command(1)); + _sender.commands().clear(); + // Empty result sets in both replies + op->receive(_sender, r1); + op->receive(_sender, r2); + + EXPECT_NO_FATAL_FAILURE(assert_gc_op_completed_ok_without_second_phase(*op)); +} + +TEST_F(GarbageCollectionOperationTest, no_second_phase_if_first_phase_has_results_but_intersection_is_empty) { + enable_two_phase_gc(); + auto op = create_op(); + op->start(_sender, framework::MilliSecTime(0)); + ASSERT_EQ(2, _sender.commands().size()); + + // No docs in common + auto r1 = make_remove_location_reply(*_sender.command(0)); + r1->set_selection_matches({_e1}); + auto r2 = make_remove_location_reply(*_sender.command(1)); + r2->set_selection_matches({_e2}); + + _sender.commands().clear(); + op->receive(_sender, r1); + op->receive(_sender, r2); + + EXPECT_NO_FATAL_FAILURE(assert_gc_op_completed_ok_without_second_phase(*op)); +} + +// We explicitly test the case where the first reply has an empty result set since we internally +// establish the baseline candidate set from the first reply. This test case leaks some internal +// implementation details, but such is life. +TEST_F(GarbageCollectionOperationTest, no_second_phase_if_first_phase_intersection_empty_first_reply_is_empty_case) { + enable_two_phase_gc(); + auto op = create_op(); + op->start(_sender, framework::MilliSecTime(0)); + ASSERT_EQ(2, _sender.commands().size()); + + auto r1 = make_remove_location_reply(*_sender.command(0)); + r1->set_selection_matches({}); + auto r2 = make_remove_location_reply(*_sender.command(1)); + r2->set_selection_matches({_e1, _e2, _e3, _e4}); + + _sender.commands().clear(); + op->receive(_sender, r1); + op->receive(_sender, r2); + + EXPECT_NO_FATAL_FAILURE(assert_gc_op_completed_ok_without_second_phase(*op)); +} + + +TEST_F(GarbageCollectionOperationTest, db_metrics_and_timestamp_are_updated_on_second_phase_completion) { + enable_two_phase_gc(); + auto op = create_op(); + op->start(_sender, framework::MilliSecTime(0)); + ASSERT_EQ(2, _sender.commands().size()); + + auto r1 = make_remove_location_reply(*_sender.command(0)); + r1->set_selection_matches({_e1, _e2, _e3}); + auto r2 = make_remove_location_reply(*_sender.command(1)); + r2->set_selection_matches({_e2, _e3, _e4}); // e2, e3 in common with r1 + + _sender.commands().clear(); + op->receive(_sender, r1); + op->receive(_sender, r2); + ASSERT_EQ(2u, _sender.commands().size()); // Phase 2 sent + + r1 = make_remove_location_reply(*_sender.command(0)); + r1->set_documents_removed(3); + r1->setBucketInfo(api::BucketInfo(0x1234, 90, 500)); + + r2 = make_remove_location_reply(*_sender.command(1)); + r2->set_documents_removed(3); + r2->setBucketInfo(api::BucketInfo(0x4567, 90, 500)); + + op->receive(_sender, r1); + op->receive(_sender, r2); + + EXPECT_TRUE(op->ok()); + EXPECT_TRUE(op->is_done()); + EXPECT_EQ(3u, gc_removed_documents_metric()); + ASSERT_NO_FATAL_FAILURE(assert_bucket_db_contains({api::BucketInfo(0x1234, 90, 500), + api::BucketInfo(0x4567, 90, 500)}, + _gc_start_time_sec)); +} + +struct GarbageCollectionOperationPhase1FailureTest : GarbageCollectionOperationTest { + std::shared_ptr<GarbageCollectionOperation> _op; + std::shared_ptr<api::RemoveLocationReply> _r1; + std::shared_ptr<api::RemoveLocationReply> _r2; + + void SetUp() override { + GarbageCollectionOperationTest::SetUp(); + + enable_two_phase_gc(); + _op = create_op(); + _op->start(_sender, framework::MilliSecTime(0)); + ASSERT_EQ(2, _sender.commands().size()); + + _r1 = make_remove_location_reply(*_sender.command(0)); + _r1->set_selection_matches({_e1}); + _r2 = make_remove_location_reply(*_sender.command(1)); + _r2->set_selection_matches({_e1}); + } + + void receive_phase1_replies() { + _sender.commands().clear(); + _op->receive(_sender, _r1); + _op->receive(_sender, _r2); + } + + void receive_phase1_replies_and_assert_no_phase_2_started() { + receive_phase1_replies(); + ASSERT_EQ(0u, _sender.commands().size()); + EXPECT_TRUE(_op->is_done()); + EXPECT_FALSE(_op->ok()); + // GC not completed, so timestamp/bucket DB are _not_ updated + ASSERT_NO_FATAL_FAILURE(assert_bucket_db_contains({api::BucketInfo(250, 50, 300), // test init values + api::BucketInfo(250, 50, 300)}, + 0/*GC start timestamp*/)); + EXPECT_EQ(0u, gc_removed_documents_metric()); // Nothing removed + } +}; + +TEST_F(GarbageCollectionOperationPhase1FailureTest, no_second_phase_if_failure_during_first_phase) { + _r2->setResult(api::ReturnCode(api::ReturnCode::TIMEOUT, "oh no")); + receive_phase1_replies_and_assert_no_phase_2_started(); +} + +TEST_F(GarbageCollectionOperationPhase1FailureTest, no_second_phase_if_cluster_state_version_changed_between_phases) { + enable_cluster_state("version:11 distributor:1 storage:2"); // version 10 -> 11 + receive_phase1_replies_and_assert_no_phase_2_started(); +} + +TEST_F(GarbageCollectionOperationPhase1FailureTest, no_second_phase_if_pending_cluster_state_between_phases) { + simulate_set_pending_cluster_state("version:11 distributor:1 storage:2"); // Pending; not enabled yet + receive_phase1_replies_and_assert_no_phase_2_started(); +} + +TEST_F(GarbageCollectionOperationPhase1FailureTest, no_second_phase_if_bucket_inconsistently_split_between_phases) { + // Add a logical child of _bucket_id to the bucket tree. This implies an inconsistent split, as we never + // want to have a tree with buckets in inner node positions, only in leaves. + addNodesToBucketDB(BucketId(17, 1), "0=250/50/300,1=250/50/300"); + receive_phase1_replies_and_assert_no_phase_2_started(); +} + +TEST_F(GarbageCollectionOperationTest, document_level_write_locks_are_checked_and_held_if_acquired) { + enable_two_phase_gc(); + auto op = create_op(); + op->start(_sender, framework::MilliSecTime(0)); + ASSERT_EQ(2, _sender.commands().size()); + + auto r1 = make_remove_location_reply(*_sender.command(0)); + r1->set_selection_matches({_e1, _e2, _e3}); + auto r2 = make_remove_location_reply(*_sender.command(1)); + r2->set_selection_matches({_e1, _e2, _e3}); + + // Grab a lock on e2 to simulate a concurrent write to the document. + auto e2_lock = _operation_sequencer.try_acquire(FixedBucketSpaces::default_space(), _e2.id); + ASSERT_TRUE(e2_lock.valid()); + + _sender.commands().clear(); + op->receive(_sender, r1); + op->receive(_sender, r2); + + // Locks on e1 and e3 are held while GC removes are sent + auto e1_lock = _operation_sequencer.try_acquire(FixedBucketSpaces::default_space(), _e1.id); + EXPECT_FALSE(e1_lock.valid()); + auto e3_lock = _operation_sequencer.try_acquire(FixedBucketSpaces::default_space(), _e3.id); + EXPECT_FALSE(e3_lock.valid()); + + std::vector<spi::IdAndTimestamp> expected({_e1, _e3}); // e2 not included in remove set + for (int i : {0, 1}) { + auto cmd = as_remove_location_command(_sender.command(i)); + EXPECT_EQ(cmd->explicit_remove_set(), expected); + } + + // Locks are implicitly released when the underlying operation is destroyed + op.reset(); + e1_lock = _operation_sequencer.try_acquire(FixedBucketSpaces::default_space(), _e1.id); + EXPECT_TRUE(e1_lock.valid()); + e3_lock = _operation_sequencer.try_acquire(FixedBucketSpaces::default_space(), _e3.id); + EXPECT_TRUE(e3_lock.valid()); +} + } // storage::distributor diff --git a/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp b/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp index 014827cb03c..19ec51f4ed4 100644 --- a/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp +++ b/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp @@ -2511,6 +2511,7 @@ TEST_F(TopLevelBucketDBUpdaterTest, node_feature_sets_are_aggregated_from_nodes_ for (auto* s : stripes) { for (uint16_t i : {0, 1, 2}) { EXPECT_FALSE(s->node_supported_features_repo().node_supported_features(i).unordered_merge_chaining); + EXPECT_FALSE(s->node_supported_features_repo().node_supported_features(i).two_phase_remove_location); } } @@ -2518,10 +2519,11 @@ TEST_F(TopLevelBucketDBUpdaterTest, node_feature_sets_are_aggregated_from_nodes_ for (uint32_t i = 0; i < _sender.commands().size(); i++) { ASSERT_NO_FATAL_FAILURE(fake_bucket_reply(state, *_sender.command(i), dummy_buckets_to_return, [i](auto& reply) noexcept { - // Pretend nodes 1 and 2 are on a shiny version with unordered merge chaining supported. + // Pretend nodes 1 and 2 are on a shiny version with support for new features. // Node 0 does not support the fanciness. if (i > 0) { reply.supported_node_features().unordered_merge_chaining = true; + reply.supported_node_features().two_phase_remove_location = true; } })); } @@ -2529,8 +2531,11 @@ TEST_F(TopLevelBucketDBUpdaterTest, node_feature_sets_are_aggregated_from_nodes_ // Node features should be propagated to all stripes for (auto* s : stripes) { EXPECT_FALSE(s->node_supported_features_repo().node_supported_features(0).unordered_merge_chaining); + EXPECT_FALSE(s->node_supported_features_repo().node_supported_features(0).two_phase_remove_location); EXPECT_TRUE(s->node_supported_features_repo().node_supported_features(1).unordered_merge_chaining); + EXPECT_TRUE(s->node_supported_features_repo().node_supported_features(1).two_phase_remove_location); EXPECT_TRUE(s->node_supported_features_repo().node_supported_features(2).unordered_merge_chaining); + EXPECT_TRUE(s->node_supported_features_repo().node_supported_features(2).two_phase_remove_location); } } diff --git a/storage/src/tests/persistence/processalltest.cpp b/storage/src/tests/persistence/processalltest.cpp index a02167bb08a..04ab5ad0cf4 100644 --- a/storage/src/tests/persistence/processalltest.cpp +++ b/storage/src/tests/persistence/processalltest.cpp @@ -10,6 +10,7 @@ #include <vespa/document/fieldvalue/intfieldvalue.h> using document::test::makeDocumentBucket; +using document::DocumentId; using namespace ::testing; namespace storage { @@ -32,7 +33,7 @@ TEST_F(ProcessAllHandlerTest, change_of_repos_is_reflected) { EXPECT_EQ(newDocRepo.get(), &getEnv().getDocumentTypeRepo()); } -TEST_F(ProcessAllHandlerTest, remove_location) { +TEST_F(ProcessAllHandlerTest, legacy_remove_location) { document::BucketId bucketId(16, 4); doPut(4, spi::Timestamp(1234)); doPut(4, spi::Timestamp(2345)); @@ -54,7 +55,7 @@ TEST_F(ProcessAllHandlerTest, remove_location) { EXPECT_EQ(2u, reply->documents_removed()); } -TEST_F(ProcessAllHandlerTest, remove_location_document_subset) { +TEST_F(ProcessAllHandlerTest, legacy_remove_location_document_subset) { document::BucketId bucketId(16, 4); AsyncHandler handler(getEnv(), getPersistenceProvider(), _bucketOwnershipNotifier, *_sequenceTaskExecutor, _bucketIdFactory); @@ -89,6 +90,95 @@ TEST_F(ProcessAllHandlerTest, remove_location_document_subset) { EXPECT_EQ(5u, reply->documents_removed()); } +TEST_F(ProcessAllHandlerTest, remove_location_with_enumerate_only_returns_match_set_only) { + document::BucketId bucketId(16, 4); + AsyncHandler handler(getEnv(), getPersistenceProvider(), _bucketOwnershipNotifier, + *_sequenceTaskExecutor, _bucketIdFactory); + + document::TestDocMan docMan; + for (int i = 0; i < 10; ++i) { + document::Document::SP doc(docMan.createRandomDocumentAtLocation(4, 1234 + i)); + doc->setValue(doc->getField("headerval"), document::IntFieldValue(i)); + doPut(doc, bucketId, spi::Timestamp(100 + i)); + } + + document::Bucket bucket = makeDocumentBucket(bucketId); + auto cmd = std::make_shared<api::RemoveLocationCommand>("testdoctype1.headerval % 2 == 0", bucket); + cmd->set_only_enumerate_docs(true); + auto tracker = handler.handleRemoveLocation(*cmd, createTracker(cmd, bucket)); + // Enumeration is synchronous, so we get the reply in the _tracker_, not on the reply queue. + ASSERT_TRUE(tracker->hasReply()); + auto* reply = dynamic_cast<api::RemoveLocationReply*>(&tracker->getReply()); + ASSERT_TRUE(reply); + EXPECT_EQ(0u, reply->documents_removed()); + + // No docs should be removed (remove flag is all zero) + EXPECT_EQ("DocEntry(100, 0, Doc(id:mail:testdoctype1:n=4:3619.html))\n" + "DocEntry(101, 0, Doc(id:mail:testdoctype1:n=4:33113.html))\n" + "DocEntry(102, 0, Doc(id:mail:testdoctype1:n=4:62608.html))\n" + "DocEntry(103, 0, Doc(id:mail:testdoctype1:n=4:26566.html))\n" + "DocEntry(104, 0, Doc(id:mail:testdoctype1:n=4:56061.html))\n" + "DocEntry(105, 0, Doc(id:mail:testdoctype1:n=4:20019.html))\n" + "DocEntry(106, 0, Doc(id:mail:testdoctype1:n=4:49514.html))\n" + "DocEntry(107, 0, Doc(id:mail:testdoctype1:n=4:13472.html))\n" + "DocEntry(108, 0, Doc(id:mail:testdoctype1:n=4:42967.html))\n" + "DocEntry(109, 0, Doc(id:mail:testdoctype1:n=4:6925.html))\n", + dumpBucket(bucketId)); + + std::vector<spi::IdAndTimestamp> expected = { + {DocumentId("id:mail:testdoctype1:n=4:3619.html"), spi::Timestamp(100)}, + {DocumentId("id:mail:testdoctype1:n=4:62608.html"), spi::Timestamp(102)}, + {DocumentId("id:mail:testdoctype1:n=4:56061.html"), spi::Timestamp(104)}, + {DocumentId("id:mail:testdoctype1:n=4:49514.html"), spi::Timestamp(106)}, + {DocumentId("id:mail:testdoctype1:n=4:42967.html"), spi::Timestamp(108)}, + }; + EXPECT_EQ(reply->selection_matches(), expected); +} + +TEST_F(ProcessAllHandlerTest, remove_location_with_remove_set_only_removes_listed_docs) { + document::BucketId bucketId(16, 4); + AsyncHandler handler(getEnv(), getPersistenceProvider(), _bucketOwnershipNotifier, + *_sequenceTaskExecutor, _bucketIdFactory); + + document::TestDocMan docMan; + for (int i = 0; i < 10; ++i) { + document::Document::SP doc(docMan.createRandomDocumentAtLocation(4, 1234 + i)); + doc->setValue(doc->getField("headerval"), document::IntFieldValue(i)); + doPut(doc, bucketId, spi::Timestamp(100 + i)); + } + + document::Bucket bucket = makeDocumentBucket(bucketId); + // Use a selection that, if naively used, removes everything. + auto cmd = std::make_shared<api::RemoveLocationCommand>("true", bucket); + std::vector<spi::IdAndTimestamp> to_remove = { + {DocumentId("id:mail:testdoctype1:n=4:62608.html"), spi::Timestamp(102)}, + {DocumentId("id:mail:testdoctype1:n=4:49514.html"), spi::Timestamp(106)}, + {DocumentId("id:mail:testdoctype1:n=4:42967.html"), spi::Timestamp(108)}, + }; + cmd->set_explicit_remove_set(std::move(to_remove)); + auto tracker = handler.handleRemoveLocation(*cmd, createTracker(cmd, bucket)); + // Actually removing the documents is asynchronous, so the response will be on the queue. + std::shared_ptr<api::StorageMessage> msg; + ASSERT_TRUE(_replySender.queue.getNext(msg, 60s)); + + // Remove flag toggled for the entries provided in the command + EXPECT_EQ("DocEntry(100, 0, Doc(id:mail:testdoctype1:n=4:3619.html))\n" + "DocEntry(101, 0, Doc(id:mail:testdoctype1:n=4:33113.html))\n" + "DocEntry(102, 1, id:mail:testdoctype1:n=4:62608.html)\n" + "DocEntry(103, 0, Doc(id:mail:testdoctype1:n=4:26566.html))\n" + "DocEntry(104, 0, Doc(id:mail:testdoctype1:n=4:56061.html))\n" + "DocEntry(105, 0, Doc(id:mail:testdoctype1:n=4:20019.html))\n" + "DocEntry(106, 1, id:mail:testdoctype1:n=4:49514.html)\n" + "DocEntry(107, 0, Doc(id:mail:testdoctype1:n=4:13472.html))\n" + "DocEntry(108, 1, id:mail:testdoctype1:n=4:42967.html)\n" + "DocEntry(109, 0, Doc(id:mail:testdoctype1:n=4:6925.html))\n", + dumpBucket(bucketId)); + + auto reply = std::dynamic_pointer_cast<api::RemoveLocationReply>(msg); + ASSERT_TRUE(reply); + EXPECT_EQ(3u, reply->documents_removed()); +} + TEST_F(ProcessAllHandlerTest, remove_location_throws_exception_on_unknown_doc_type) { document::BucketId bucketId(16, 4); doPut(4, spi::Timestamp(1234)); diff --git a/storage/src/tests/storageapi/mbusprot/mbusprot.4.2.serialization.V_4_2_STABLE b/storage/src/tests/storageapi/mbusprot/mbusprot.4.2.serialization.V_4_2_STABLE deleted file mode 100644 index 5045a98b037..00000000000 --- a/storage/src/tests/storageapi/mbusprot/mbusprot.4.2.serialization.V_4_2_STABLE +++ /dev/null @@ -1,66 +0,0 @@ - -MessageType(10, Put) -\00\00\00 -\00\00\00j\00\08\00\00\00ddoc:test:test\00\05testdoctype1\00\00\01\00\00\00=\00\01\05\00= ;This is the contents of the test document. -It ain't much. -\00@\00\00\00\00\00\00Q\00\00\00\00\00\00\00\0e\00\00\00\00\00\00\00\00\01\ff\ff -MessageType(11, Put Reply, reply of Put) -\00\00\00\0b\00\00\00\0ddoc:test:test@\00\00\00\00\00\00Q\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\01 -MessageType(82, Update) -\00\00\00R\00\00\005\00\08doc:test:test\00\01testdoctype1\00\00\01\00\00\00\01\00\00\00\02\00\00\00\01\00\00\10\1b\01\00\00\00\11@\00\00\00\00\00\00Q\00\00\00\00\00\00\00\0e\00\00\00\00\00\00\00 -\00\00\00\00\00\00\00\00\01\ff\ff -MessageType(83, Update Reply, reply of Update) -\00\00\00S\00\00\00\0ddoc:test:test@\00\00\00\00\00\00Q\00\00\00\00\00\00\00\0e\00\00\00\00\00\00\00\08\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\01 -MessageType(4, Get) -\00\00\00\04\00\00\00\0ddoc:test:test@\00\00\00\00\00\00Q\00\00\00\00\00\00\00{\01\00\00\00\00\00\00\00\00\01\ff\ff -MessageType(5, Get Reply, reply of Get) -\00\00\00\05\00\00\00j\00\08\00\00\00ddoc:test:test\00\05testdoctype1\00\00\01\00\00\00=\00\01\05\00= ;This is the contents of the test document. -It ain't much. -\00\00\00\00\00\00\00\00d\00\00\00\0ddoc:test:test\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\00\01 -MessageType(12, Remove) -\00\00\00\0c\00\00\00\0ddoc:test:test@\00\00\00\00\00\00Q\00\00\00\00\00\00\00\9f\00\00\00\00\00\00\00\00\01\ff\ff -MessageType(13, Remove Reply, reply of Remove) -\00\00\00\0d\00\00\00\0ddoc:test:test@\00\00\00\00\00\00Q\00\00\00\00\00\00\000\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\01 -MessageType(14, Revert) -\00\00\00\0e@\00\00\00\00\00\00Q\00\00\00\01\00\00\00\00\00\00\00;\00\00\00\00\00\00\00\00\01\ff\ff -MessageType(15, Revert Reply, reply of Revert) -\00\00\00\0f@\00\00\00\00\00\00Q\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\01 -MessageType(54, Request bucket info) -\00\00\006\00\00\00\00\00\03\00\00\00\14distributor:3 .1.s:d\00\00\00\00\00\00\00\00\01\ff\ff -MessageType(55, Request bucket info reply, reply of Request bucket info) -\00\00\007\00\00\00\01\00\00\00\00\00\00\00\04\00\00\00+\00\00\00\18\00\00\00{\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\00\01 -MessageType(56, Notify bucket change) -\00\00\008P\00\00\00\00\00\03\e8\00\00\00\02\00\00\00\03\00\00\00\04\00\00\00\00\00\00\00\00\01\ff\ff -MessageType(57, Notify bucket change reply, reply of Notify bucket change) -\00\00\009\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\00\01 -MessageType(26, Create bucket) -\00\00\00\1a\00\00\00\00\00\00\02o\00\00\00\00\00\00\00\00\01\ff\ff -MessageType(27, Create bucket reply, reply of Create bucket) -\00\00\00\1b\00\00\00\00\00\00\02o\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\01 -MessageType(34, Delete bucket) -\00\00\00"\00\00\00\00\00\00\02o\00\00\00\00\00\00\00\00\01\ff\ff -MessageType(35, Delete bucket reply, reply of Delete bucket) -\00\00\00#\00\00\00\00\00\00\02o\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\01 -MessageType(32, Merge bucket) -\00\00\00 \00\00\00\00\00\00\02o\00\03\00\04\00\00\0d\01\00\1a\01\00\00\00\00\00\00\04\d2\00\00\00\00\00\00\00\00\01\ff\ff -MessageType(33, Merge bucket reply, reply of Merge bucket) -\00\00\00!\00\00\00\00\00\00\02o\00\03\00\04\00\00\0d\01\00\1a\01\00\00\00\00\00\00\04\d2\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\00\01\00\00\00\04\00\00\00 -\00\00\00d -MessageType(50, GetBucketDiff) -\00\00\002\00\00\00\00\00\00\02o\00\02\00\04\00\00\0d\00\00\00\00\00\00\00\04 \00\00\00\01\00\00\00\00\00\01\e2@\00\0c1234567890ab\00\00\00d\00\01\00\00\00\01\00\03\00\00\00\00\00\00\00\00\01\ff\ff -MessageType(51, GetBucketDiff reply, reply of GetBucketDiff) -\00\00\003\00\00\00\00\00\00\02o\00\02\00\04\00\00\0d\00\00\00\00\00\00\00\04 \00\00\00\01\00\00\00\00\00\01\e2@\00\0c1234567890ab\00\00\00d\00\01\00\00\00\01\00\03\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\00\01 -MessageType(52, ApplyBucketDiff) -\00\00\004@\00\00\00\00\00\02o\00\02\00\04\00\00\0d\00\00\00\04\d2\00\00\00\01\00\00\00\00\00\00\00\00\00\0c\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\01\ff\ff -MessageType(53, ApplyBucketDiff reply, reply of ApplyBucketDiff) -\00\00\005@\00\00\00\00\00\02o\00\02\00\04\00\00\0d\00\00\00\04\d2\00\00\00\01\00\00\00\00\00\00\00\00\00\0c\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\01 -MessageType(66, SplitBucket) -\00\00\00B@\00\00\00\00\00\00\00\14(\00\00\03\e8\00\00\00\05\00\00\00\00\00\00\00\00\01\ff\ff -MessageType(67, SplitBucket reply, reply of SplitBucket) -\00\00\00C@\00\00\00\00\00\00\00\00\00\00\02D\00\00\00\00\00\00\00\00\00\00d\00\00\03\e8\00\00'\10D\00\00\00\00\00\00\01\00\00\00e\00\00\03\e9\00\00'\11\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\00\01 -MessageType(18, Visitor Create) -\00\00\00\12\00\00\00\07library\00\00\00\02id\00\00\00\0ddoc selection\00\00\00\01\00\00\00\0bcontroldest\00\00\00\08datadest\00\00\00\02\00\00\00\00\00\00\00{\00\00\00\00\00\00\01\c8\00\00\00\02@\00\00\00\00\00\00\01@\00\00\00\00\00\00\02\00\01\01\00\00\00d\00\00\00\03\00\00\00\0dinto darkness\00\00\00\09bind them\00\00\00\08one ring\00\00\00\10to rule them all\00\00\00\0bone ring to\00\00\00\0dfind them and\00\00\00\00\00\00\00\00\01\ff\ff -MessageType(19, Visitor Create Reply, reply of Visitor Create) -\00\00\00\13\00\00\00\01\00\00\00\00\00\00\00\00\00\00\00\00\01 -It ain't much. -\00\00P\00\00\f1\f1\f1\f1\f1\00\00\00\00\00\00\00\00\01\ff\ff diff --git a/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp b/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp index d4d22d6a36c..5ed2e0d96b4 100644 --- a/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp +++ b/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp @@ -387,11 +387,8 @@ TEST_P(StorageProtocolTest, request_bucket_info) { // separately until we can figure out if this is by design or not. EXPECT_EQ(lastMod, entries[0]._info.getLastModified()); - if (GetParam().getMajor() >= 7) { - EXPECT_TRUE(reply2->supported_node_features().unordered_merge_chaining); - } else { - EXPECT_FALSE(reply2->supported_node_features().unordered_merge_chaining); - } + EXPECT_TRUE(reply2->supported_node_features().unordered_merge_chaining); + EXPECT_TRUE(reply2->supported_node_features().two_phase_remove_location); } } @@ -530,16 +527,57 @@ TEST_P(StorageProtocolTest, destroy_visitor) { auto reply2 = copyReply(reply); } -TEST_P(StorageProtocolTest, remove_location) { +TEST_P(StorageProtocolTest, legacy_remove_location) { + auto cmd = std::make_shared<RemoveLocationCommand>("id.group == \"mygroup\"", _bucket); + auto cmd2 = copyCommand(cmd); + EXPECT_EQ("id.group == \"mygroup\"", cmd2->getDocumentSelection()); + EXPECT_EQ(_bucket, cmd2->getBucket()); + EXPECT_TRUE(cmd2->explicit_remove_set().empty()); + EXPECT_FALSE(cmd2->only_enumerate_docs()); + + uint32_t n_docs_removed = 12345; + auto reply = std::make_shared<RemoveLocationReply>(*cmd2, n_docs_removed); + auto reply2 = copyReply(reply); + EXPECT_EQ(n_docs_removed, reply2->documents_removed()); + EXPECT_TRUE(reply2->selection_matches().empty()); +} + +TEST_P(StorageProtocolTest, phase_1_remove_location) { + auto cmd = std::make_shared<RemoveLocationCommand>("id.group == \"mygroup\"", _bucket); + cmd->set_only_enumerate_docs(true); + auto cmd2 = copyCommand(cmd); + EXPECT_EQ("id.group == \"mygroup\"", cmd2->getDocumentSelection()); + EXPECT_EQ(_bucket, cmd2->getBucket()); + EXPECT_TRUE(cmd2->explicit_remove_set().empty()); + EXPECT_TRUE(cmd2->only_enumerate_docs()); + + auto reply = std::make_shared<RemoveLocationReply>(*cmd2, 0); + std::vector<spi::IdAndTimestamp> docs; + docs.emplace_back(DocumentId("id:foo:bar::baz"), spi::Timestamp(12345)); + docs.emplace_back(DocumentId("id:foo:bar::zoid"), spi::Timestamp(67890)); + reply->set_selection_matches(docs); + auto reply2 = copyReply(reply); + EXPECT_EQ(0, reply2->documents_removed()); + EXPECT_EQ(reply2->selection_matches(), docs); +} + +TEST_P(StorageProtocolTest, phase_2_remove_location) { auto cmd = std::make_shared<RemoveLocationCommand>("id.group == \"mygroup\"", _bucket); + std::vector<spi::IdAndTimestamp> docs; + docs.emplace_back(DocumentId("id:foo:bar::baz"), spi::Timestamp(12345)); + docs.emplace_back(DocumentId("id:foo:bar::zoid"), spi::Timestamp(67890)); + cmd->set_explicit_remove_set(docs); auto cmd2 = copyCommand(cmd); EXPECT_EQ("id.group == \"mygroup\"", cmd2->getDocumentSelection()); EXPECT_EQ(_bucket, cmd2->getBucket()); + EXPECT_FALSE(cmd2->only_enumerate_docs()); + EXPECT_EQ(cmd2->explicit_remove_set(), docs); uint32_t n_docs_removed = 12345; auto reply = std::make_shared<RemoveLocationReply>(*cmd2, n_docs_removed); auto reply2 = copyReply(reply); EXPECT_EQ(n_docs_removed, reply2->documents_removed()); + EXPECT_TRUE(reply2->selection_matches().empty()); } TEST_P(StorageProtocolTest, stat_bucket) { diff --git a/storage/src/vespa/storage/config/distributorconfiguration.cpp b/storage/src/vespa/storage/config/distributorconfiguration.cpp index b4c23725493..c83c5cdd245 100644 --- a/storage/src/vespa/storage/config/distributorconfiguration.cpp +++ b/storage/src/vespa/storage/config/distributorconfiguration.cpp @@ -52,6 +52,7 @@ DistributorConfiguration::DistributorConfiguration(StorageComponent& component) _implicitly_clear_priority_on_schedule(false), _use_unordered_merge_chaining(false), _inhibit_default_merges_when_global_merges_pending(false), + _enable_two_phase_garbage_collection(false), _minimumReplicaCountingMode(ReplicaCountingMode::TRUSTED) { } @@ -175,6 +176,7 @@ DistributorConfiguration::configure(const vespa::config::content::core::StorDist _implicitly_clear_priority_on_schedule = config.implicitlyClearBucketPriorityOnSchedule; _use_unordered_merge_chaining = config.useUnorderedMergeChaining; _inhibit_default_merges_when_global_merges_pending = config.inhibitDefaultMergesWhenGlobalMergesPending; + _enable_two_phase_garbage_collection = config.enableTwoPhaseGarbageCollection; _minimumReplicaCountingMode = config.minimumReplicaCountingMode; diff --git a/storage/src/vespa/storage/config/distributorconfiguration.h b/storage/src/vespa/storage/config/distributorconfiguration.h index b26f115827e..a18770ac69f 100644 --- a/storage/src/vespa/storage/config/distributorconfiguration.h +++ b/storage/src/vespa/storage/config/distributorconfiguration.h @@ -119,6 +119,8 @@ public: const MaintenancePriorities& getMaintenancePriorities() const { return _maintenancePriorities; } + + uint8_t default_external_feed_priority() const noexcept { return 120; } /** @see setSplitCount @@ -279,6 +281,12 @@ public: [[nodiscard]] bool inhibit_default_merges_when_global_merges_pending() const noexcept { return _inhibit_default_merges_when_global_merges_pending; } + void set_enable_two_phase_garbage_collection(bool enable) noexcept { + _enable_two_phase_garbage_collection = enable; + } + [[nodiscard]] bool enable_two_phase_garbage_collection() const noexcept { + return _enable_two_phase_garbage_collection; + } uint32_t num_distributor_stripes() const noexcept { return _num_distributor_stripes; } @@ -338,6 +346,7 @@ private: bool _implicitly_clear_priority_on_schedule; bool _use_unordered_merge_chaining; bool _inhibit_default_merges_when_global_merges_pending; + bool _enable_two_phase_garbage_collection; DistrConfig::MinimumReplicaCountingMode _minimumReplicaCountingMode; diff --git a/storage/src/vespa/storage/config/stor-distributormanager.def b/storage/src/vespa/storage/config/stor-distributormanager.def index 0a858ba37c3..e363f53a4ea 100644 --- a/storage/src/vespa/storage/config/stor-distributormanager.def +++ b/storage/src/vespa/storage/config/stor-distributormanager.def @@ -299,3 +299,11 @@ use_unordered_merge_chaining bool default=true ## one or more nodes is in maintenance mode in the default bucket space but marked up in ## the global bucket space. inhibit_default_merges_when_global_merges_pending bool default=true + +## If true, garbage collection is performed in two phases (metadata gathering and deletion) +## instead of just a single phase. Two-phase GC allows for ensuring the same set of documents +## is deleted across all nodes and explicitly takes write locks on the distributor to prevent +## concurrent feed ops to GC'd documents from potentially creating inconsistencies. +## Two-phase GC is only used iff all replica content nodes support the feature AND it's enabled +## by this config. +enable_two_phase_garbage_collection bool default=false diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.h b/storage/src/vespa/storage/distributor/distributor_stripe.h index 49b062d6cc1..d1366bc0285 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe.h +++ b/storage/src/vespa/storage/distributor/distributor_stripe.h @@ -93,6 +93,10 @@ public: return *_operation_sequencer; } + OperationSequencer& operation_sequencer() noexcept override { + return *_operation_sequencer; + } + const lib::ClusterState* pendingClusterStateOrNull(const document::BucketSpace&) const override; /** diff --git a/storage/src/vespa/storage/distributor/distributormessagesender.h b/storage/src/vespa/storage/distributor/distributormessagesender.h index 198427078d6..ef0252661f3 100644 --- a/storage/src/vespa/storage/distributor/distributormessagesender.h +++ b/storage/src/vespa/storage/distributor/distributormessagesender.h @@ -29,6 +29,7 @@ public: virtual PendingMessageTracker& getPendingMessageTracker() = 0; virtual const PendingMessageTracker& getPendingMessageTracker() const = 0; virtual const OperationSequencer& operation_sequencer() const noexcept = 0; + virtual OperationSequencer& operation_sequencer() noexcept = 0; }; } diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp index 9ce8d871fc3..0ada9da29c1 100644 --- a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp +++ b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp @@ -63,6 +63,9 @@ public: const OperationSequencer& operation_sequencer() const noexcept override { abort(); // Never called by the messages using this component. } + OperationSequencer& operation_sequencer() noexcept override { + abort(); // Never called by the messages using this component. + } }; ExternalOperationHandler::ExternalOperationHandler(DistributorNodeContext& node_ctx, diff --git a/storage/src/vespa/storage/distributor/node_supported_features.h b/storage/src/vespa/storage/distributor/node_supported_features.h index 647e063f93e..87b2a9aef8e 100644 --- a/storage/src/vespa/storage/distributor/node_supported_features.h +++ b/storage/src/vespa/storage/distributor/node_supported_features.h @@ -11,11 +11,10 @@ namespace storage::distributor { * are initially expected to be unsupported. */ struct NodeSupportedFeatures { - bool unordered_merge_chaining = false; + bool unordered_merge_chaining = false; + bool two_phase_remove_location = false; - bool operator==(const NodeSupportedFeatures& rhs) const noexcept { - return unordered_merge_chaining == rhs.unordered_merge_chaining; - }; + bool operator==(const NodeSupportedFeatures& rhs) const noexcept = default; }; } diff --git a/storage/src/vespa/storage/distributor/operationowner.h b/storage/src/vespa/storage/distributor/operationowner.h index d4afdf2cce7..c43d9f05fc9 100644 --- a/storage/src/vespa/storage/distributor/operationowner.h +++ b/storage/src/vespa/storage/distributor/operationowner.h @@ -55,6 +55,10 @@ public: return _sender.operation_sequencer(); } + OperationSequencer& operation_sequencer() noexcept override { + return _sender.operation_sequencer(); + } + private: OperationOwner& _owner; DistributorStripeMessageSender& _sender; diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp index e91df1da51e..74ff8371fbe 100644 --- a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp @@ -36,6 +36,8 @@ PutOperation::PutOperation(const DistributorNodeContext& node_ctx, { } +PutOperation::~PutOperation() = default; + void PutOperation::insertDatabaseEntryAndScheduleCreateBucket(const OperationTargetList& copies, bool setOneActive, const api::StorageCommand& originalCommand, diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.h b/storage/src/vespa/storage/distributor/operations/external/putoperation.h index 61e4678496f..7ec81d6570d 100644 --- a/storage/src/vespa/storage/distributor/operations/external/putoperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.h @@ -29,6 +29,7 @@ public: std::shared_ptr<api::PutCommand> msg, PersistenceOperationMetricSet& metric, SequencingHandle sequencingHandle = SequencingHandle()); + ~PutOperation() override; void onStart(DistributorStripeMessageSender& sender) override; const char* getName() const override { return "put"; }; diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp index af3b727e07b..7c04636dc50 100644 --- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp @@ -97,6 +97,10 @@ struct IntermediateMessageSender : DistributorStripeMessageSender { const OperationSequencer& operation_sequencer() const noexcept override { return forward.operation_sequencer(); } + + OperationSequencer& operation_sequencer() noexcept override { + return forward.operation_sequencer(); + } }; IntermediateMessageSender::IntermediateMessageSender(SentMessageMap& mm, diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp index 7f66d1effd5..a30be1293cd 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp @@ -5,44 +5,110 @@ #include <vespa/storage/distributor/idealstatemetricsset.h> #include <vespa/storage/distributor/top_level_distributor.h> #include <vespa/storage/distributor/distributor_bucket_space.h> +#include <vespa/storage/distributor/node_supported_features_repo.h> #include <vespa/storageapi/message/removelocation.h> +#include <vespa/vespalib/stllike/hash_set.hpp> +#include <algorithm> #include <vespa/log/log.h> -LOG_SETUP(".distributor.operation.idealstate.remove"); +LOG_SETUP(".distributor.operation.idealstate.gc"); namespace storage::distributor { GarbageCollectionOperation::GarbageCollectionOperation(const ClusterContext& cluster_ctx, const BucketAndNodes& nodes) : IdealStateOperation(nodes), _tracker(cluster_ctx), + _phase(Phase::NotStarted), + _cluster_state_version_at_phase1_start_time(0), + _phase1_replies_received(0), + _remove_candidate_set(), _replica_info(), - _max_documents_removed(0) + _max_documents_removed(0), + _is_done(false) {} GarbageCollectionOperation::~GarbageCollectionOperation() = default; -void GarbageCollectionOperation::onStart(DistributorStripeMessageSender& sender) { +const char* GarbageCollectionOperation::to_string(Phase phase) noexcept { + switch (phase) { + case Phase::NotStarted: return "NotStarted"; + case Phase::LegacySinglePhase: return "LegacySinglePhase"; + case Phase::ReadMetadataPhase: return "ReadMetadataPhase"; + case Phase::WriteRemovesPhase: return "WriteRemovesPhase"; + default: abort(); + } +} + +bool GarbageCollectionOperation::all_involved_nodes_support_two_phase_gc() const noexcept { + const auto& features_repo = _manager->operation_context().node_supported_features_repo(); + for (uint16_t node : getNodes()) { + if (!features_repo.node_supported_features(node).two_phase_remove_location) { + return false; + } + } + return true; +} + +std::vector<spi::IdAndTimestamp> GarbageCollectionOperation::compile_phase_two_send_set() const { + std::vector<spi::IdAndTimestamp> docs_to_remove(_remove_candidate_set.begin(), _remove_candidate_set.end()); + // Use timestamp order to provide test determinism and allow for backend linear merging (if needed). + // Tie-break on GID upon collisions (which technically should never happen...!) + auto ts_then_gid_order = [](const spi::IdAndTimestamp& lhs, const spi::IdAndTimestamp& rhs) noexcept { + if (lhs.timestamp != rhs.timestamp) { + return (lhs.timestamp < rhs.timestamp); + } + return (lhs.id.getGlobalId() < rhs.id.getGlobalId()); + }; + std::sort(docs_to_remove.begin(), docs_to_remove.end(), ts_then_gid_order); + return docs_to_remove; +} + +void GarbageCollectionOperation::send_current_phase_remove_locations(DistributorStripeMessageSender& sender) { BucketDatabase::Entry entry = _bucketSpace->getBucketDatabase().get(getBucketId()); std::vector<uint16_t> nodes = entry->getNodes(); + auto docs_to_remove = compile_phase_two_send_set(); // Always empty unless in phase 2 of two-phase GC - for (auto node : nodes) { + for (size_t i = 0; i < nodes.size(); ++i) { auto command = std::make_shared<api::RemoveLocationCommand>( _manager->operation_context().distributor_config().getGarbageCollectionSelection(), getBucket()); - - command->setPriority(_priority); - _tracker.queueCommand(command, node); + if (_phase == Phase::ReadMetadataPhase) { + command->set_only_enumerate_docs(true); + } else if (_phase == Phase::WriteRemovesPhase) { + if (i < nodes.size() - 1) { + command->set_explicit_remove_set(docs_to_remove); + } else { + command->set_explicit_remove_set(std::move(docs_to_remove)); + } + } // else: legacy command + command->setPriority((_phase != Phase::WriteRemovesPhase) + ? _priority + : _manager->operation_context().distributor_config().default_external_feed_priority()); + _tracker.queueCommand(command, nodes[i]); } - _tracker.flushQueue(sender); +} +void GarbageCollectionOperation::onStart(DistributorStripeMessageSender& sender) { + if (_manager->operation_context().distributor_config().enable_two_phase_garbage_collection() && + all_involved_nodes_support_two_phase_gc()) + { + _cluster_state_version_at_phase1_start_time = _bucketSpace->getClusterState().getVersion(); + LOG(debug, "Starting first phase of two-phase GC for %s at cluster state version %u", + getBucket().toString().c_str(), _cluster_state_version_at_phase1_start_time); + transition_to(Phase::ReadMetadataPhase); + } else { + LOG(debug, "Starting legacy single-phase GC for %s", getBucket().toString().c_str()); + transition_to(Phase::LegacySinglePhase); + } + send_current_phase_remove_locations(sender); if (_tracker.finished()) { done(); } } void -GarbageCollectionOperation::onReceive(DistributorStripeMessageSender&, +GarbageCollectionOperation::onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply>& reply) { auto* rep = dynamic_cast<api::RemoveLocationReply*>(reply.get()); @@ -51,33 +117,154 @@ GarbageCollectionOperation::onReceive(DistributorStripeMessageSender&, uint16_t node = _tracker.handleReply(*rep); if (!rep->getResult().failed()) { - _replica_info.emplace_back(_manager->operation_context().generate_unique_timestamp(), - node, rep->getBucketInfo()); - _max_documents_removed = std::max(_max_documents_removed, rep->documents_removed()); + if (_phase == Phase::LegacySinglePhase) { + handle_ok_legacy_reply(node, *rep); + } else if (_phase == Phase::ReadMetadataPhase) { + handle_ok_phase1_reply(*rep); + } else { + assert(_phase == Phase::WriteRemovesPhase); + handle_ok_phase2_reply(node, *rep); + } } else { _ok = false; } if (_tracker.finished()) { + const bool op_complete = (!_ok || _phase == Phase::LegacySinglePhase || _phase == Phase::WriteRemovesPhase); if (_ok) { - merge_received_bucket_info_into_db(); + if (op_complete) { + merge_received_bucket_info_into_db(); + } else { + assert(_phase == Phase::ReadMetadataPhase); + on_metadata_read_phase_done(sender); + } + } + if (op_complete) { + mark_operation_complete(); } - update_gc_metrics(); - done(); } } -void GarbageCollectionOperation::merge_received_bucket_info_into_db() { - // TODO avoid two separate DB ops for this. Current API currently does not make this elegant. - _manager->operation_context().update_bucket_database(getBucket(), _replica_info); +void GarbageCollectionOperation::update_replica_response_info_from_reply(uint16_t from_node, const api::RemoveLocationReply& reply) { + _replica_info.emplace_back(_manager->operation_context().generate_unique_timestamp(), + from_node, reply.getBucketInfo()); + _max_documents_removed = std::max(_max_documents_removed, reply.documents_removed()); +} + +void GarbageCollectionOperation::handle_ok_legacy_reply(uint16_t from_node, const api::RemoveLocationReply& reply) { + update_replica_response_info_from_reply(from_node, reply); +} + +GarbageCollectionOperation::RemoveCandidateSet +GarbageCollectionOperation::steal_selection_matches_as_set(api::RemoveLocationReply& reply) { + auto candidates = reply.steal_selection_matches(); + RemoveCandidateSet set; + set.resize(candidates.size()); + for (auto& cand : candidates) { + set.insert(std::move(cand)); + } + return set; +} + +void GarbageCollectionOperation::handle_ok_phase1_reply(api::RemoveLocationReply& reply) { + assert(reply.documents_removed() == 0); + if (_phase1_replies_received == 0) { + // Establish baseline candidate set. Since we require an intersection between all + // sets, the number of candidates can never be _greater_ than that of the first reply. + _remove_candidate_set = steal_selection_matches_as_set(reply); + } else if (!_remove_candidate_set.empty()) { + auto their_set = steal_selection_matches_as_set(reply); + std::vector<spi::IdAndTimestamp> to_remove; + for (auto& our_cand : _remove_candidate_set) { + if (!their_set.contains(our_cand)) { + to_remove.emplace_back(our_cand); + } + } + for (auto& rm_entry : to_remove) { + _remove_candidate_set.erase(rm_entry); + } + } + ++_phase1_replies_received; +} + +void GarbageCollectionOperation::handle_ok_phase2_reply(uint16_t from_node, const api::RemoveLocationReply& reply) { + update_replica_response_info_from_reply(from_node, reply); +} + +bool GarbageCollectionOperation::may_start_write_phase() const { + if (!_ok) { + return false; // Already broken, no reason to proceed. + } + const auto state_version_now = _bucketSpace->getClusterState().getVersion(); + if ((state_version_now != _cluster_state_version_at_phase1_start_time) || + _bucketSpace->has_pending_cluster_state()) + { + LOG(debug, "GC(%s): not sending write phase; cluster state has changed, or a change is pending", + getBucket().toString().c_str()); + return false; + } + // If bucket is gone, or has become inconsistently split, abort mission. + std::vector<BucketDatabase::Entry> entries; + _bucketSpace->getBucketDatabase().getAll(getBucketId(), entries); + if ((entries.size() != 1) || (entries[0].getBucketId() != getBucketId())) { + LOG(debug, "GC(%s): not sending write phase; bucket has become inconsistent", + getBucket().toString().c_str()); + return false; + } + return true; +} + +void GarbageCollectionOperation::on_metadata_read_phase_done(DistributorStripeMessageSender& sender) { + if (!may_start_write_phase()) { + _ok = false; + mark_operation_complete(); + return; + } + std::vector<spi::IdAndTimestamp> already_pending_write; + for (auto& cand : _remove_candidate_set) { + auto maybe_seq_token = sender.operation_sequencer().try_acquire(getBucket().getBucketSpace(), cand.id); + if (maybe_seq_token.valid()) { + _gc_write_locks.emplace_back(std::move(maybe_seq_token)); + LOG(spam, "GC(%s): acquired write lock for '%s'; adding to GC set", + getBucket().toString().c_str(), cand.id.toString().c_str()); + } else { + already_pending_write.emplace_back(cand); + LOG(spam, "GC(%s): failed to acquire write lock for '%s'; not including in GC set", + getBucket().toString().c_str(), cand.id.toString().c_str()); + } + } + for (auto& rm_entry : already_pending_write) { + _remove_candidate_set.erase(rm_entry); + } + if (_remove_candidate_set.empty()) { + update_last_gc_timestamp_in_db(); // Nothing to remove now, try again later. + mark_operation_complete(); + return; + } + LOG(debug, "GC(%s): Sending phase 2 GC with %zu entries (with acquired write locks). " + "%zu documents had pending writes and could not be GCd at this time", + getBucket().toString().c_str(), _remove_candidate_set.size(), already_pending_write.size()); + transition_to(Phase::WriteRemovesPhase); + send_current_phase_remove_locations(sender); +} + +void GarbageCollectionOperation::update_last_gc_timestamp_in_db() { BucketDatabase::Entry dbentry = _bucketSpace->getBucketDatabase().get(getBucketId()); if (dbentry.valid()) { dbentry->setLastGarbageCollectionTime( _manager->node_context().clock().getTimeInSeconds().getTime()); + LOG(debug, "GC(%s): Tagging bucket completed at time %u", + getBucket().toString().c_str(), dbentry->getLastGarbageCollectionTime()); _bucketSpace->getBucketDatabase().update(dbentry); } } +void GarbageCollectionOperation::merge_received_bucket_info_into_db() { + // TODO avoid two separate DB ops for this. Current API currently does not make this elegant. + _manager->operation_context().update_bucket_database(getBucket(), _replica_info); + update_last_gc_timestamp_in_db(); +} + void GarbageCollectionOperation::update_gc_metrics() { auto metric_base = _manager->getMetrics().operations[IdealStateOperation::GARBAGE_COLLECTION]; auto gc_metrics = std::dynamic_pointer_cast<GcMetricSet>(metric_base); @@ -85,6 +272,21 @@ void GarbageCollectionOperation::update_gc_metrics() { gc_metrics->documents_removed.inc(_max_documents_removed); } +void GarbageCollectionOperation::mark_operation_complete() { + assert(!_is_done); + if (_ok) { + update_gc_metrics(); + } + done(); + _is_done = true; +} + +void GarbageCollectionOperation::transition_to(Phase new_phase) { + LOG(spam, "GC(%s): state transition %s -> %s", + getBucket().toString().c_str(), to_string(_phase), to_string(new_phase)); + _phase = new_phase; +} + bool GarbageCollectionOperation::shouldBlockThisOperation(uint32_t, uint16_t, uint8_t) const { return true; diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h index f51739242b7..adbbd210877 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h +++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h @@ -4,14 +4,16 @@ #include "idealstateoperation.h" #include <vespa/storage/bucketdb/bucketcopy.h> #include <vespa/storage/distributor/messagetracker.h> +#include <vespa/storage/distributor/operation_sequencer.h> +#include <vespa/persistence/spi/id_and_timestamp.h> +#include <vespa/vespalib/stllike/hash_set.h> #include <vector> namespace storage::distributor { class PendingMessageTracker; -class GarbageCollectionOperation : public IdealStateOperation -{ +class GarbageCollectionOperation final : public IdealStateOperation { public: GarbageCollectionOperation(const ClusterContext& cluster_ctx, const BucketAndNodes& nodes); @@ -22,15 +24,51 @@ public: const char* getName() const override { return "garbagecollection"; }; Type getType() const override { return GARBAGE_COLLECTION; } bool shouldBlockThisOperation(uint32_t, uint16_t, uint8_t) const override; + bool is_two_phase() const noexcept { + return ((_phase == Phase::ReadMetadataPhase) || (_phase == Phase::WriteRemovesPhase)); + } + bool is_done() const noexcept { return _is_done; } protected: MessageTracker _tracker; private: - std::vector<BucketCopy> _replica_info; - uint32_t _max_documents_removed; + enum class Phase { + NotStarted, + LegacySinglePhase, + ReadMetadataPhase, + WriteRemovesPhase + }; + static const char* to_string(Phase phase) noexcept; + + using RemoveCandidateSet = vespalib::hash_set<spi::IdAndTimestamp, spi::IdAndTimestamp::hash>; + + Phase _phase; + uint32_t _cluster_state_version_at_phase1_start_time; + uint32_t _phase1_replies_received; + RemoveCandidateSet _remove_candidate_set; + std::vector<SequencingHandle> _gc_write_locks; + std::vector<BucketCopy> _replica_info; + uint32_t _max_documents_removed; + bool _is_done; + + static RemoveCandidateSet steal_selection_matches_as_set(api::RemoveLocationReply& reply); + + void send_current_phase_remove_locations(DistributorStripeMessageSender& sender); + std::vector<spi::IdAndTimestamp> compile_phase_two_send_set() const; + + void handle_ok_legacy_reply(uint16_t from_node, const api::RemoveLocationReply& reply); + void handle_ok_phase1_reply(api::RemoveLocationReply& reply); + void handle_ok_phase2_reply(uint16_t from_node, const api::RemoveLocationReply& reply); + void update_replica_response_info_from_reply(uint16_t from_node, const api::RemoveLocationReply& reply); + void on_metadata_read_phase_done(DistributorStripeMessageSender& sender); + [[nodiscard]] bool may_start_write_phase() const; + [[nodiscard]] bool all_involved_nodes_support_two_phase_gc() const noexcept; + void update_last_gc_timestamp_in_db(); void merge_received_bucket_info_into_db(); void update_gc_metrics(); + void mark_operation_complete(); + void transition_to(Phase new_phase); }; } diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h index f8f35afe821..7e62506b77f 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h +++ b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h @@ -179,8 +179,9 @@ public: /** Set the priority we should send messages from this operation with. */ - void setPriority(api::StorageMessage::Priority priority) - { _priority = priority; } + void setPriority(api::StorageMessage::Priority priority) noexcept { + _priority = priority; + } /** * Returns true if we are blocked to start this operation given diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp index 30071d4d962..dadbda60021 100644 --- a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp +++ b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp @@ -340,7 +340,8 @@ PendingClusterState::update_node_supported_features_from_reply(uint16_t node, co { const auto& src_feat = reply.supported_node_features(); NodeSupportedFeatures dest_feat; - dest_feat.unordered_merge_chaining = src_feat.unordered_merge_chaining; + dest_feat.unordered_merge_chaining = src_feat.unordered_merge_chaining; + dest_feat.two_phase_remove_location = src_feat.two_phase_remove_location; // This will overwrite per bucket-space reply, but does not matter since it's independent of bucket space. _node_features.insert(std::make_pair(node, dest_feat)); } diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp index d5bf733a30c..cc1bd557298 100644 --- a/storage/src/vespa/storage/persistence/asynchandler.cpp +++ b/storage/src/vespa/storage/persistence/asynchandler.cpp @@ -406,19 +406,34 @@ AsyncHandler::handleRemoveLocation(api::RemoveLocationCommand& cmd, MessageTrack { tracker->setMetric(_env._metrics.removeLocation); - LOG(debug, "RemoveLocation(%s): using selection '%s'", - cmd.getBucketId().toString().c_str(), - cmd.getDocumentSelection().c_str()); - spi::Bucket bucket(cmd.getBucket()); - UnrevertableRemoveEntryProcessor::DocumentIdsAndTimeStamps to_remove; - UnrevertableRemoveEntryProcessor processor(to_remove); - - { - auto usage = vespalib::CpuUsage::use(CpuUsage::Category::READ); - BucketProcessor::iterateAll(_spi, bucket, cmd.getDocumentSelection(), - std::make_shared<document::DocIdOnly>(), - processor, spi::NEWEST_DOCUMENT_ONLY, tracker->context()); + const bool is_legacy = (!cmd.only_enumerate_docs() && cmd.explicit_remove_set().empty()); + std::vector<spi::IdAndTimestamp> to_remove; + + LOG(debug, "RemoveLocation(%s): using selection '%s' (enumerate only: %s, remove set size: %zu)", + bucket.toString().c_str(), + cmd.getDocumentSelection().c_str(), + (cmd.only_enumerate_docs() ? "yes" : "no"), + cmd.explicit_remove_set().size()); + + if (is_legacy || cmd.only_enumerate_docs()) { + UnrevertableRemoveEntryProcessor processor(to_remove); + { + auto usage = vespalib::CpuUsage::use(CpuUsage::Category::READ); + BucketProcessor::iterateAll(_spi, bucket, cmd.getDocumentSelection(), + std::make_shared<document::DocIdOnly>(), + processor, spi::NEWEST_DOCUMENT_ONLY, tracker->context()); + } + if (!is_legacy) { + LOG(debug, "RemoveLocation(%s): returning 1st phase results with %zu entries", + bucket.toString().c_str(), to_remove.size()); + auto reply = std::make_shared<api::RemoveLocationReply>(cmd, 0); // No docs removed yet + reply->set_selection_matches(std::move(to_remove)); + tracker->setReply(std::move(reply)); + return tracker; + } + } else { + to_remove = cmd.steal_explicit_remove_set(); } auto task = makeResultTask([&cmd, tracker = std::move(tracker), removed = to_remove.size()](spi::Result::UP response) { @@ -427,6 +442,8 @@ AsyncHandler::handleRemoveLocation(api::RemoveLocationCommand& cmd, MessageTrack tracker->sendReply(); }); + // In the case where a _newer_ mutation exists for a given entry in to_remove, it will be ignored + // (with no tombstone added) since we only preserve the newest operation for a document. _spi.removeAsync(bucket, std::move(to_remove), std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task))); diff --git a/storage/src/vespa/storageapi/mbusprot/protobuf/feed.proto b/storage/src/vespa/storageapi/mbusprot/protobuf/feed.proto index cb923db3c3c..92c0fdc0b87 100644 --- a/storage/src/vespa/storageapi/mbusprot/protobuf/feed.proto +++ b/storage/src/vespa/storageapi/mbusprot/protobuf/feed.proto @@ -88,9 +88,26 @@ message RevertResponse { BucketId remapped_bucket_id = 2; } +message IdAndTimestamp { + DocumentId id = 1; + uint64 timestamp = 2; +} + +message PhaseOneRemove { + // Currently empty; its presence is enough +} + +message PhaseTwoRemove { + repeated IdAndTimestamp explicit_remove_set = 1; +} + message RemoveLocationRequest { - Bucket bucket = 1; - bytes document_selection = 2; + Bucket bucket = 1; + bytes document_selection = 2; + oneof phased_remove { + PhaseOneRemove phase_one = 3; + PhaseTwoRemove phase_two = 4; + } } message RemoveLocationStats { @@ -98,7 +115,8 @@ message RemoveLocationStats { } message RemoveLocationResponse { - BucketInfo bucket_info = 1; - BucketId remapped_bucket_id = 2; - RemoveLocationStats stats = 3; + BucketInfo bucket_info = 1; + BucketId remapped_bucket_id = 2; + RemoveLocationStats stats = 3; + repeated IdAndTimestamp selection_matches = 4; // Iff reply to phase 1 remove } diff --git a/storage/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto b/storage/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto index 7f7ab1d7c0b..a79ac9fd99a 100644 --- a/storage/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto +++ b/storage/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto @@ -110,7 +110,8 @@ message BucketAndBucketInfo { } message SupportedNodeFeatures { - bool unordered_merge_chaining = 1; + bool unordered_merge_chaining = 1; + bool two_phase_remove_location = 2; } message RequestBucketInfoResponse { diff --git a/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp b/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp index dee766b4b2d..662408c4e95 100644 --- a/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp +++ b/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp @@ -646,30 +646,83 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeRevertReply(const SCmd& cm // RemoveLocation // ----------------------------------------------------------------- +namespace { + +void set_document_id(protobuf::DocumentId& dest, const document::DocumentId& src) { + *dest.mutable_id() = src.toString(); +} + +document::DocumentId get_document_id(const protobuf::DocumentId& src) { + return document::DocumentId(src.id()); // id() shall always be null terminated +} + +void set_id_and_timestamp_vector(::google::protobuf::RepeatedPtrField<protobuf::IdAndTimestamp>& dest, + const std::vector<spi::IdAndTimestamp>& src) +{ + dest.Reserve(src.size()); + for (const auto& src_entry : src) { + auto* dest_entry = dest.Add(); + dest_entry->set_timestamp(src_entry.timestamp); + set_document_id(*dest_entry->mutable_id(), src_entry.id); + } +} + +std::vector<spi::IdAndTimestamp> +get_id_and_timestamp_vector(const ::google::protobuf::RepeatedPtrField<protobuf::IdAndTimestamp>& src) +{ + std::vector<spi::IdAndTimestamp> vec; + vec.reserve(src.size()); + for (const auto& src_entry : src) { + vec.emplace_back(get_document_id(src_entry.id()), spi::Timestamp(src_entry.timestamp())); + } + return vec; +} + +} + void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RemoveLocationCommand& msg) const { encode_bucket_request<protobuf::RemoveLocationRequest>(buf, msg, [&](auto& req) { req.set_document_selection(msg.getDocumentSelection().data(), msg.getDocumentSelection().size()); + if (msg.only_enumerate_docs()) { + req.mutable_phase_one(); // Instantiating it is enough + } else if (!msg.explicit_remove_set().empty()) { + set_id_and_timestamp_vector(*req.mutable_phase_two()->mutable_explicit_remove_set(), + msg.explicit_remove_set()); + } }); } void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RemoveLocationReply& msg) const { encode_bucket_info_response<protobuf::RemoveLocationResponse>(buf, msg, [&](auto& res) { res.mutable_stats()->set_documents_removed(msg.documents_removed()); + if (!msg.selection_matches().empty()) { + set_id_and_timestamp_vector(*res.mutable_selection_matches(), msg.selection_matches()); + } }); } api::StorageCommand::UP ProtocolSerialization7::onDecodeRemoveLocationCommand(BBuf& buf) const { return decode_bucket_request<protobuf::RemoveLocationRequest>(buf, [&](auto& req, auto& bucket) { - return std::make_unique<api::RemoveLocationCommand>(req.document_selection(), bucket); + auto cmd = std::make_unique<api::RemoveLocationCommand>(req.document_selection(), bucket); + if (req.has_phase_one()) { + cmd->set_only_enumerate_docs(true); + } else if (req.has_phase_two()) { + cmd->set_explicit_remove_set(get_id_and_timestamp_vector(req.phase_two().explicit_remove_set())); + } + return cmd; }); } api::StorageReply::UP ProtocolSerialization7::onDecodeRemoveLocationReply(const SCmd& cmd, BBuf& buf) const { return decode_bucket_info_response<protobuf::RemoveLocationResponse>(buf, [&](auto& res) { uint32_t documents_removed = (res.has_stats() ? res.stats().documents_removed() : 0u); - return std::make_unique<api::RemoveLocationReply>( + auto reply = std::make_unique<api::RemoveLocationReply>( static_cast<const api::RemoveLocationCommand&>(cmd), documents_removed); + if (!res.selection_matches().empty()) { + reply->set_selection_matches(get_id_and_timestamp_vector(res.selection_matches())); + } + return reply; }); } @@ -1004,6 +1057,7 @@ void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RequestBucketInfoRe // We mark features as available at protocol level. Only included for full bucket fetch responses. if (msg.full_bucket_fetch()) { res.mutable_supported_node_features()->set_unordered_merge_chaining(true); + res.mutable_supported_node_features()->set_two_phase_remove_location(true); } }); } @@ -1044,7 +1098,8 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeRequestBucketInfoReply(con if (res.has_supported_node_features()) { const auto& src_features = res.supported_node_features(); auto& dest_features = reply->supported_node_features(); - dest_features.unordered_merge_chaining = src_features.unordered_merge_chaining(); + dest_features.unordered_merge_chaining = src_features.unordered_merge_chaining(); + dest_features.two_phase_remove_location = src_features.two_phase_remove_location(); } return reply; }); diff --git a/storage/src/vespa/storageapi/message/bucket.h b/storage/src/vespa/storageapi/message/bucket.h index 47785a92039..d9843236d2e 100644 --- a/storage/src/vespa/storageapi/message/bucket.h +++ b/storage/src/vespa/storageapi/message/bucket.h @@ -393,7 +393,8 @@ public: friend std::ostream& operator<<(std::ostream& os, const Entry&); }; struct SupportedNodeFeatures { - bool unordered_merge_chaining = false; + bool unordered_merge_chaining = false; + bool two_phase_remove_location = false; }; using EntryVector = vespalib::Array<Entry>; private: diff --git a/storage/src/vespa/storageapi/message/removelocation.cpp b/storage/src/vespa/storageapi/message/removelocation.cpp index 7b7ed894b2c..5d558c5e305 100644 --- a/storage/src/vespa/storageapi/message/removelocation.cpp +++ b/storage/src/vespa/storageapi/message/removelocation.cpp @@ -11,15 +11,17 @@ IMPLEMENT_REPLY(RemoveLocationReply) RemoveLocationCommand::RemoveLocationCommand(vespalib::stringref documentSelection, const document::Bucket &bucket) : BucketInfoCommand(MessageType::REMOVELOCATION, bucket), - _documentSelection(documentSelection) + _documentSelection(documentSelection), + _explicit_remove_set(), + _only_enumerate_docs(false) {} -RemoveLocationCommand::~RemoveLocationCommand() {} +RemoveLocationCommand::~RemoveLocationCommand() = default; void RemoveLocationCommand::print(std::ostream& out, bool verbose, const std::string& indent) const { - if (_documentSelection.length()) { + if (!_documentSelection.empty()) { out << "Remove selection(" << _documentSelection << "): "; } BucketInfoCommand::print(out, verbose, indent); diff --git a/storage/src/vespa/storageapi/message/removelocation.h b/storage/src/vespa/storageapi/message/removelocation.h index 276b090cc57..4e7c1d26711 100644 --- a/storage/src/vespa/storageapi/message/removelocation.h +++ b/storage/src/vespa/storageapi/message/removelocation.h @@ -2,6 +2,7 @@ #pragma once #include <vespa/storageapi/defs.h> +#include <vespa/persistence/spi/id_and_timestamp.h> #include <vespa/storageapi/messageapi/storagecommand.h> #include <vespa/storageapi/messageapi/bucketinforeply.h> @@ -15,13 +16,34 @@ public: void print(std::ostream& out, bool verbose, const std::string& indent) const override; const vespalib::string& getDocumentSelection() const { return _documentSelection; } + // TODO move to factory pattern instead to disallow creating illegal combinations + void set_only_enumerate_docs(bool only_enumerate) noexcept { + _only_enumerate_docs = only_enumerate; + } + [[nodiscard]] bool only_enumerate_docs() const noexcept { + return _only_enumerate_docs; + } + void set_explicit_remove_set(std::vector<spi::IdAndTimestamp> remove_set) { + _explicit_remove_set = std::move(remove_set); + } + const std::vector<spi::IdAndTimestamp>& explicit_remove_set() const noexcept { + return _explicit_remove_set; + } + std::vector<spi::IdAndTimestamp> steal_explicit_remove_set() const noexcept { + return std::move(_explicit_remove_set); + } + DECLARE_STORAGECOMMAND(RemoveLocationCommand, onRemoveLocation); private: + // TODO make variant? Only one of the two may be used vespalib::string _documentSelection; + std::vector<spi::IdAndTimestamp> _explicit_remove_set; + bool _only_enumerate_docs; }; class RemoveLocationReply : public BucketInfoReply { + std::vector<spi::IdAndTimestamp> _selection_matches; // For use in 1st phase GC uint32_t _documents_removed; public: explicit RemoveLocationReply(const RemoveLocationCommand& cmd, uint32_t docs_removed = 0); @@ -29,6 +51,16 @@ public: _documents_removed = docs_removed; } uint32_t documents_removed() const noexcept { return _documents_removed; } + // TODO refactor + void set_selection_matches(std::vector<spi::IdAndTimestamp> matches) noexcept { + _selection_matches = std::move(matches); + } + const std::vector<spi::IdAndTimestamp>& selection_matches() const noexcept { + return _selection_matches; + } + std::vector<spi::IdAndTimestamp> steal_selection_matches() noexcept { + return std::move(_selection_matches); + } DECLARE_STORAGEREPLY(RemoveLocationReply, onRemoveLocationReply) }; |