diff options
Diffstat (limited to 'storage/src/tests/distributor/garbagecollectiontest.cpp')
-rw-r--r-- | storage/src/tests/distributor/garbagecollectiontest.cpp | 335 |
1 files changed, 329 insertions, 6 deletions
diff --git a/storage/src/tests/distributor/garbagecollectiontest.cpp b/storage/src/tests/distributor/garbagecollectiontest.cpp index 07ef4604e20..4f9fd25098b 100644 --- a/storage/src/tests/distributor/garbagecollectiontest.cpp +++ b/storage/src/tests/distributor/garbagecollectiontest.cpp @@ -11,33 +11,83 @@ #include <vespa/vespalib/gtest/gtest.h> using document::test::makeDocumentBucket; +using document::BucketId; +using document::DocumentId; +using document::FixedBucketSpaces; using namespace ::testing; namespace storage::distributor { struct GarbageCollectionOperationTest : Test, DistributorStripeTestUtil { + BucketId _bucket_id; + OperationSequencer _operation_sequencer; + uint32_t _gc_start_time_sec; + spi::IdAndTimestamp _e1; + spi::IdAndTimestamp _e2; + spi::IdAndTimestamp _e3; + spi::IdAndTimestamp _e4; + + + GarbageCollectionOperationTest() + : _bucket_id(16, 1), + _operation_sequencer(), + _gc_start_time_sec(34), + _e1(DocumentId("id:foo:bar::doc-1"), spi::Timestamp(100)), + _e2(DocumentId("id:foo:bar::doc-2"), spi::Timestamp(200)), + _e3(DocumentId("id:foo:bar::doc-3"), spi::Timestamp(300)), + _e4(DocumentId("id:foo:bar::doc-4"), spi::Timestamp(400)) + {} + void SetUp() override { createLinks(); - enable_cluster_state("distributor:1 storage:2"); - addNodesToBucketDB(document::BucketId(16, 1), "0=250/50/300,1=250/50/300"); + enable_cluster_state("version:10 distributor:1 storage:2"); + addNodesToBucketDB(_bucket_id, "0=250/50/300,1=250/50/300"); auto cfg = make_config(); cfg->setGarbageCollection("music.date < 34", 3600s); configure_stripe(cfg); - getClock().setAbsoluteTimeInSeconds(34); + getClock().setAbsoluteTimeInSeconds(_gc_start_time_sec); + _sender.set_operation_sequencer(_operation_sequencer); }; void TearDown() override { close(); } + void enable_two_phase_gc() { + NodeSupportedFeatures with_two_phase; + with_two_phase.two_phase_remove_location = true; + set_node_supported_features(0, with_two_phase); + set_node_supported_features(1, with_two_phase); + + config_enable_two_phase_gc(true); + } + + void config_enable_two_phase_gc(bool enabled) { + auto config = make_config(); + config->set_enable_two_phase_garbage_collection(enabled); + configure_stripe(std::move(config)); + } + std::shared_ptr<GarbageCollectionOperation> create_op() { auto op = std::make_shared<GarbageCollectionOperation>( - dummy_cluster_context, BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), + dummy_cluster_context, BucketAndNodes(makeDocumentBucket(_bucket_id), toVector<uint16_t>(0, 1))); op->setIdealStateManager(&getIdealStateManager()); return op; } + static std::shared_ptr<api::RemoveLocationCommand> as_remove_location_command(const std::shared_ptr<api::StorageCommand>& cmd) { + auto msg = std::dynamic_pointer_cast<api::RemoveLocationCommand>(cmd); + assert(msg); + return msg; + } + + static std::shared_ptr<api::RemoveLocationReply> make_remove_location_reply(api::StorageCommand& msg) { + auto reply = std::shared_ptr<api::StorageReply>(msg.makeReply()); + assert(reply->getType() == api::MessageType::REMOVELOCATION_REPLY); + return std::dynamic_pointer_cast<api::RemoveLocationReply>(reply); + } + // FIXME fragile to assume that send order == node index, but that's the way it currently works void reply_to_nth_request(GarbageCollectionOperation& op, size_t n, uint32_t bucket_info_checksum, uint32_t n_docs_removed) { @@ -51,8 +101,14 @@ struct GarbageCollectionOperationTest : Test, DistributorStripeTestUtil { op.receive(_sender, reply); } + void assert_bucket_last_gc_timestamp_is(uint32_t gc_time) { + BucketDatabase::Entry entry = getBucket(_bucket_id); + ASSERT_TRUE(entry.valid()); + EXPECT_EQ(entry->getLastGarbageCollectionTime(), gc_time); + } + void assert_bucket_db_contains(std::vector<api::BucketInfo> info, uint32_t last_gc_time) { - BucketDatabase::Entry entry = getBucket(document::BucketId(16, 1)); + BucketDatabase::Entry entry = getBucket(_bucket_id); ASSERT_TRUE(entry.valid()); ASSERT_EQ(entry->getNodeCount(), info.size()); EXPECT_EQ(entry->getLastGarbageCollectionTime(), last_gc_time); @@ -69,11 +125,21 @@ struct GarbageCollectionOperationTest : Test, DistributorStripeTestUtil { assert(gc_metrics); return gc_metrics->documents_removed.getValue(); } + + void assert_gc_op_completed_ok_without_second_phase(GarbageCollectionOperation& op) { + ASSERT_EQ(0u, _sender.commands().size()); + EXPECT_TRUE(op.is_done()); + EXPECT_TRUE(op.ok()); // It's not a failure to have nothing to do + // GC timestamp must be updated so we can move on to another bucket. + EXPECT_NO_FATAL_FAILURE(assert_bucket_last_gc_timestamp_is(_gc_start_time_sec)); + EXPECT_EQ(0u, gc_removed_documents_metric()); // Nothing removed + } }; -TEST_F(GarbageCollectionOperationTest, simple) { +TEST_F(GarbageCollectionOperationTest, simple_legacy) { auto op = create_op(); op->start(_sender, framework::MilliSecTime(0)); + EXPECT_FALSE(op->is_two_phase()); ASSERT_EQ(2, _sender.commands().size()); EXPECT_EQ(0u, gc_removed_documents_metric()); @@ -119,4 +185,261 @@ TEST_F(GarbageCollectionOperationTest, gc_bucket_info_does_not_overwrite_later_s ASSERT_NO_FATAL_FAILURE(assert_bucket_db_contains({api::BucketInfo(7777, 100, 2000), api::BucketInfo(4567, 90, 500)}, 34)); } +TEST_F(GarbageCollectionOperationTest, two_phase_gc_requires_config_enabling_and_explicit_node_support) { + NodeSupportedFeatures with_two_phase; + with_two_phase.two_phase_remove_location = true; + set_node_supported_features(1, with_two_phase); + + config_enable_two_phase_gc(true); + + // Config enabled, but only 1 node says it supports two-phase RemoveLocation + auto op = create_op(); + op->start(_sender, framework::MilliSecTime(0)); + EXPECT_FALSE(op->is_two_phase()); + + // Node 0 suddenly upgraded...! + set_node_supported_features(0, with_two_phase); + op = create_op(); + op->start(_sender, framework::MilliSecTime(0)); + EXPECT_TRUE(op->is_two_phase()); + + // But doesn't matter if two-phase GC is config-disabled + config_enable_two_phase_gc(false); + + op = create_op(); + op->start(_sender, framework::MilliSecTime(0)); + EXPECT_FALSE(op->is_two_phase()); +} + +TEST_F(GarbageCollectionOperationTest, first_phase_sends_enumerate_only_remove_locations_with_provided_gc_pri) { + enable_two_phase_gc(); + auto op = create_op(); + op->setPriority(getConfig().getMaintenancePriorities().garbageCollection); + op->start(_sender, framework::MilliSecTime(0)); + ASSERT_EQ(2, _sender.commands().size()); + + for (int i : {0, 1}) { + auto cmd = as_remove_location_command(_sender.command(i)); + EXPECT_TRUE(cmd->only_enumerate_docs()); + EXPECT_EQ(cmd->getPriority(), getConfig().getMaintenancePriorities().garbageCollection); + } +} + +TEST_F(GarbageCollectionOperationTest, second_phase_sends_intersection_of_returned_entries_with_feed_pri) { + enable_two_phase_gc(); + auto op = create_op(); + op->start(_sender, framework::MilliSecTime(0)); + ASSERT_EQ(2, _sender.commands().size()); + + auto r1 = make_remove_location_reply(*_sender.command(0)); + r1->set_selection_matches({_e1, _e2, _e3}); + auto r2 = make_remove_location_reply(*_sender.command(1)); + r2->set_selection_matches({_e2, _e3, _e4}); // e2, e3 in common with r1 + + _sender.commands().clear(); + op->receive(_sender, r1); + ASSERT_EQ(0u, _sender.commands().size()); // No phase 2 yet, must get reply from all nodes + op->receive(_sender, r2); + ASSERT_EQ(2u, _sender.commands().size()); // Phase 2 sent + + std::vector<spi::IdAndTimestamp> expected({_e2, _e3}); + for (int i : {0, 1}) { + auto cmd = as_remove_location_command(_sender.command(i)); + EXPECT_FALSE(cmd->only_enumerate_docs()); + EXPECT_EQ(cmd->explicit_remove_set(), expected); + EXPECT_EQ(cmd->getPriority(), getConfig().default_external_feed_priority()); + } +} + +TEST_F(GarbageCollectionOperationTest, no_second_phase_if_first_phase_has_no_results) { + enable_two_phase_gc(); + auto op = create_op(); + op->start(_sender, framework::MilliSecTime(0)); + ASSERT_EQ(2, _sender.commands().size()); + + auto r1 = make_remove_location_reply(*_sender.command(0)); + auto r2 = make_remove_location_reply(*_sender.command(1)); + _sender.commands().clear(); + // Empty result sets in both replies + op->receive(_sender, r1); + op->receive(_sender, r2); + + EXPECT_NO_FATAL_FAILURE(assert_gc_op_completed_ok_without_second_phase(*op)); +} + +TEST_F(GarbageCollectionOperationTest, no_second_phase_if_first_phase_has_results_but_intersection_is_empty) { + enable_two_phase_gc(); + auto op = create_op(); + op->start(_sender, framework::MilliSecTime(0)); + ASSERT_EQ(2, _sender.commands().size()); + + // No docs in common + auto r1 = make_remove_location_reply(*_sender.command(0)); + r1->set_selection_matches({_e1}); + auto r2 = make_remove_location_reply(*_sender.command(1)); + r2->set_selection_matches({_e2}); + + _sender.commands().clear(); + op->receive(_sender, r1); + op->receive(_sender, r2); + + EXPECT_NO_FATAL_FAILURE(assert_gc_op_completed_ok_without_second_phase(*op)); +} + +// We explicitly test the case where the first reply has an empty result set since we internally +// establish the baseline candidate set from the first reply. This test case leaks some internal +// implementation details, but such is life. +TEST_F(GarbageCollectionOperationTest, no_second_phase_if_first_phase_intersection_empty_first_reply_is_empty_case) { + enable_two_phase_gc(); + auto op = create_op(); + op->start(_sender, framework::MilliSecTime(0)); + ASSERT_EQ(2, _sender.commands().size()); + + auto r1 = make_remove_location_reply(*_sender.command(0)); + r1->set_selection_matches({}); + auto r2 = make_remove_location_reply(*_sender.command(1)); + r2->set_selection_matches({_e1, _e2, _e3, _e4}); + + _sender.commands().clear(); + op->receive(_sender, r1); + op->receive(_sender, r2); + + EXPECT_NO_FATAL_FAILURE(assert_gc_op_completed_ok_without_second_phase(*op)); +} + + +TEST_F(GarbageCollectionOperationTest, db_metrics_and_timestamp_are_updated_on_second_phase_completion) { + enable_two_phase_gc(); + auto op = create_op(); + op->start(_sender, framework::MilliSecTime(0)); + ASSERT_EQ(2, _sender.commands().size()); + + auto r1 = make_remove_location_reply(*_sender.command(0)); + r1->set_selection_matches({_e1, _e2, _e3}); + auto r2 = make_remove_location_reply(*_sender.command(1)); + r2->set_selection_matches({_e2, _e3, _e4}); // e2, e3 in common with r1 + + _sender.commands().clear(); + op->receive(_sender, r1); + op->receive(_sender, r2); + ASSERT_EQ(2u, _sender.commands().size()); // Phase 2 sent + + r1 = make_remove_location_reply(*_sender.command(0)); + r1->set_documents_removed(3); + r1->setBucketInfo(api::BucketInfo(0x1234, 90, 500)); + + r2 = make_remove_location_reply(*_sender.command(1)); + r2->set_documents_removed(3); + r2->setBucketInfo(api::BucketInfo(0x4567, 90, 500)); + + op->receive(_sender, r1); + op->receive(_sender, r2); + + EXPECT_TRUE(op->ok()); + EXPECT_TRUE(op->is_done()); + EXPECT_EQ(3u, gc_removed_documents_metric()); + ASSERT_NO_FATAL_FAILURE(assert_bucket_db_contains({api::BucketInfo(0x1234, 90, 500), + api::BucketInfo(0x4567, 90, 500)}, + _gc_start_time_sec)); +} + +struct GarbageCollectionOperationPhase1FailureTest : GarbageCollectionOperationTest { + std::shared_ptr<GarbageCollectionOperation> _op; + std::shared_ptr<api::RemoveLocationReply> _r1; + std::shared_ptr<api::RemoveLocationReply> _r2; + + void SetUp() override { + GarbageCollectionOperationTest::SetUp(); + + enable_two_phase_gc(); + _op = create_op(); + _op->start(_sender, framework::MilliSecTime(0)); + ASSERT_EQ(2, _sender.commands().size()); + + _r1 = make_remove_location_reply(*_sender.command(0)); + _r1->set_selection_matches({_e1}); + _r2 = make_remove_location_reply(*_sender.command(1)); + _r2->set_selection_matches({_e1}); + } + + void receive_phase1_replies() { + _sender.commands().clear(); + _op->receive(_sender, _r1); + _op->receive(_sender, _r2); + } + + void receive_phase1_replies_and_assert_no_phase_2_started() { + receive_phase1_replies(); + ASSERT_EQ(0u, _sender.commands().size()); + EXPECT_TRUE(_op->is_done()); + EXPECT_FALSE(_op->ok()); + // GC not completed, so timestamp/bucket DB are _not_ updated + ASSERT_NO_FATAL_FAILURE(assert_bucket_db_contains({api::BucketInfo(250, 50, 300), // test init values + api::BucketInfo(250, 50, 300)}, + 0/*GC start timestamp*/)); + EXPECT_EQ(0u, gc_removed_documents_metric()); // Nothing removed + } +}; + +TEST_F(GarbageCollectionOperationPhase1FailureTest, no_second_phase_if_failure_during_first_phase) { + _r2->setResult(api::ReturnCode(api::ReturnCode::TIMEOUT, "oh no")); + receive_phase1_replies_and_assert_no_phase_2_started(); +} + +TEST_F(GarbageCollectionOperationPhase1FailureTest, no_second_phase_if_cluster_state_version_changed_between_phases) { + enable_cluster_state("version:11 distributor:1 storage:2"); // version 10 -> 11 + receive_phase1_replies_and_assert_no_phase_2_started(); +} + +TEST_F(GarbageCollectionOperationPhase1FailureTest, no_second_phase_if_pending_cluster_state_between_phases) { + simulate_set_pending_cluster_state("version:11 distributor:1 storage:2"); // Pending; not enabled yet + receive_phase1_replies_and_assert_no_phase_2_started(); +} + +TEST_F(GarbageCollectionOperationPhase1FailureTest, no_second_phase_if_bucket_inconsistently_split_between_phases) { + // Add a logical child of _bucket_id to the bucket tree. This implies an inconsistent split, as we never + // want to have a tree with buckets in inner node positions, only in leaves. + addNodesToBucketDB(BucketId(17, 1), "0=250/50/300,1=250/50/300"); + receive_phase1_replies_and_assert_no_phase_2_started(); +} + +TEST_F(GarbageCollectionOperationTest, document_level_write_locks_are_checked_and_held_if_acquired) { + enable_two_phase_gc(); + auto op = create_op(); + op->start(_sender, framework::MilliSecTime(0)); + ASSERT_EQ(2, _sender.commands().size()); + + auto r1 = make_remove_location_reply(*_sender.command(0)); + r1->set_selection_matches({_e1, _e2, _e3}); + auto r2 = make_remove_location_reply(*_sender.command(1)); + r2->set_selection_matches({_e1, _e2, _e3}); + + // Grab a lock on e2 to simulate a concurrent write to the document. + auto e2_lock = _operation_sequencer.try_acquire(FixedBucketSpaces::default_space(), _e2.id); + ASSERT_TRUE(e2_lock.valid()); + + _sender.commands().clear(); + op->receive(_sender, r1); + op->receive(_sender, r2); + + // Locks on e1 and e3 are held while GC removes are sent + auto e1_lock = _operation_sequencer.try_acquire(FixedBucketSpaces::default_space(), _e1.id); + EXPECT_FALSE(e1_lock.valid()); + auto e3_lock = _operation_sequencer.try_acquire(FixedBucketSpaces::default_space(), _e3.id); + EXPECT_FALSE(e3_lock.valid()); + + std::vector<spi::IdAndTimestamp> expected({_e1, _e3}); // e2 not included in remove set + for (int i : {0, 1}) { + auto cmd = as_remove_location_command(_sender.command(i)); + EXPECT_EQ(cmd->explicit_remove_set(), expected); + } + + // Locks are implicitly released when the underlying operation is destroyed + op.reset(); + e1_lock = _operation_sequencer.try_acquire(FixedBucketSpaces::default_space(), _e1.id); + EXPECT_TRUE(e1_lock.valid()); + e3_lock = _operation_sequencer.try_acquire(FixedBucketSpaces::default_space(), _e3.id); + EXPECT_TRUE(e3_lock.valid()); +} + } // storage::distributor |