diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2022-09-06 21:58:22 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-09-06 21:58:22 +0200 |
commit | 343dd16027714ca43f677d3de711cf5625aac158 (patch) | |
tree | f820f1649b2356e8436732a0cce51508fb6de2ae | |
parent | c2236fbba225babc5088cc9a06638b76b2183b69 (diff) | |
parent | c4701a64fa95c00e4133a29fadc84a0771b34831 (diff) |
Merge pull request #23951 from vespa-engine/vekterli/make-gc-work-with-parent-child-with-subset-indexed
Make two-phase GC work for parent-child with subset of replicas indexed [run-systemtest]
3 files changed, 46 insertions, 88 deletions
diff --git a/storage/src/tests/distributor/garbagecollectiontest.cpp b/storage/src/tests/distributor/garbagecollectiontest.cpp index 4f9fd25098b..1a104727f43 100644 --- a/storage/src/tests/distributor/garbagecollectiontest.cpp +++ b/storage/src/tests/distributor/garbagecollectiontest.cpp @@ -26,7 +26,7 @@ struct GarbageCollectionOperationTest : Test, DistributorStripeTestUtil { spi::IdAndTimestamp _e2; spi::IdAndTimestamp _e3; spi::IdAndTimestamp _e4; - + spi::IdAndTimestamp _e5; GarbageCollectionOperationTest() : _bucket_id(16, 1), @@ -35,7 +35,8 @@ struct GarbageCollectionOperationTest : Test, DistributorStripeTestUtil { _e1(DocumentId("id:foo:bar::doc-1"), spi::Timestamp(100)), _e2(DocumentId("id:foo:bar::doc-2"), spi::Timestamp(200)), _e3(DocumentId("id:foo:bar::doc-3"), spi::Timestamp(300)), - _e4(DocumentId("id:foo:bar::doc-4"), spi::Timestamp(400)) + _e4(DocumentId("id:foo:bar::doc-4"), spi::Timestamp(400)), + _e5(DocumentId("id:foo:bar::doc-4"), spi::Timestamp(500)) // Same as e4 but with higher timestamp {} void SetUp() override { @@ -225,16 +226,16 @@ TEST_F(GarbageCollectionOperationTest, first_phase_sends_enumerate_only_remove_l } } -TEST_F(GarbageCollectionOperationTest, second_phase_sends_intersection_of_returned_entries_with_feed_pri) { +TEST_F(GarbageCollectionOperationTest, second_phase_sends_highest_timestamped_union_of_returned_entries_with_feed_pri) { enable_two_phase_gc(); auto op = create_op(); op->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ(2, _sender.commands().size()); auto r1 = make_remove_location_reply(*_sender.command(0)); - r1->set_selection_matches({_e1, _e2, _e3}); + r1->set_selection_matches({_e1, _e2, _e3, _e5}); auto r2 = make_remove_location_reply(*_sender.command(1)); - r2->set_selection_matches({_e2, _e3, _e4}); // e2, e3 in common with r1 + r2->set_selection_matches({_e2, _e3, _e4}); _sender.commands().clear(); op->receive(_sender, r1); @@ -242,7 +243,8 @@ TEST_F(GarbageCollectionOperationTest, second_phase_sends_intersection_of_return op->receive(_sender, r2); ASSERT_EQ(2u, _sender.commands().size()); // Phase 2 sent - std::vector<spi::IdAndTimestamp> expected({_e2, _e3}); + // e5 is same doc as e4, but at a higher timestamp; only e5 entry should be included. + std::vector<spi::IdAndTimestamp> expected({_e1, _e2, _e3, _e5}); for (int i : {0, 1}) { auto cmd = as_remove_location_command(_sender.command(i)); EXPECT_FALSE(cmd->only_enumerate_docs()); @@ -267,47 +269,6 @@ TEST_F(GarbageCollectionOperationTest, no_second_phase_if_first_phase_has_no_res EXPECT_NO_FATAL_FAILURE(assert_gc_op_completed_ok_without_second_phase(*op)); } -TEST_F(GarbageCollectionOperationTest, no_second_phase_if_first_phase_has_results_but_intersection_is_empty) { - enable_two_phase_gc(); - auto op = create_op(); - op->start(_sender, framework::MilliSecTime(0)); - ASSERT_EQ(2, _sender.commands().size()); - - // No docs in common - auto r1 = make_remove_location_reply(*_sender.command(0)); - r1->set_selection_matches({_e1}); - auto r2 = make_remove_location_reply(*_sender.command(1)); - r2->set_selection_matches({_e2}); - - _sender.commands().clear(); - op->receive(_sender, r1); - op->receive(_sender, r2); - - EXPECT_NO_FATAL_FAILURE(assert_gc_op_completed_ok_without_second_phase(*op)); -} - -// We explicitly test the case where the first reply has an empty result set since we internally -// establish the baseline candidate set from the first reply. This test case leaks some internal -// implementation details, but such is life. -TEST_F(GarbageCollectionOperationTest, no_second_phase_if_first_phase_intersection_empty_first_reply_is_empty_case) { - enable_two_phase_gc(); - auto op = create_op(); - op->start(_sender, framework::MilliSecTime(0)); - ASSERT_EQ(2, _sender.commands().size()); - - auto r1 = make_remove_location_reply(*_sender.command(0)); - r1->set_selection_matches({}); - auto r2 = make_remove_location_reply(*_sender.command(1)); - r2->set_selection_matches({_e1, _e2, _e3, _e4}); - - _sender.commands().clear(); - op->receive(_sender, r1); - op->receive(_sender, r2); - - EXPECT_NO_FATAL_FAILURE(assert_gc_op_completed_ok_without_second_phase(*op)); -} - - TEST_F(GarbageCollectionOperationTest, db_metrics_and_timestamp_are_updated_on_second_phase_completion) { enable_two_phase_gc(); auto op = create_op(); @@ -317,7 +278,7 @@ TEST_F(GarbageCollectionOperationTest, db_metrics_and_timestamp_are_updated_on_s auto r1 = make_remove_location_reply(*_sender.command(0)); r1->set_selection_matches({_e1, _e2, _e3}); auto r2 = make_remove_location_reply(*_sender.command(1)); - r2->set_selection_matches({_e2, _e3, _e4}); // e2, e3 in common with r1 + r2->set_selection_matches({_e2, _e3, _e4}); _sender.commands().clear(); op->receive(_sender, r1); @@ -421,6 +382,7 @@ TEST_F(GarbageCollectionOperationTest, document_level_write_locks_are_checked_an _sender.commands().clear(); op->receive(_sender, r1); op->receive(_sender, r2); + ASSERT_EQ(2, _sender.commands().size()); // Locks on e1 and e3 are held while GC removes are sent auto e1_lock = _operation_sequencer.try_acquire(FixedBucketSpaces::default_space(), _e1.id); diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp index 3e82b6b10a7..d3e4e49c193 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp @@ -7,7 +7,7 @@ #include <vespa/storage/distributor/distributor_bucket_space.h> #include <vespa/storage/distributor/node_supported_features_repo.h> #include <vespa/storageapi/message/removelocation.h> -#include <vespa/vespalib/stllike/hash_set.hpp> +#include <vespa/vespalib/stllike/hash_map.hpp> #include <algorithm> #include <vespa/log/log.h> @@ -20,8 +20,7 @@ GarbageCollectionOperation::GarbageCollectionOperation(const ClusterContext& clu _tracker(cluster_ctx), _phase(Phase::NotStarted), _cluster_state_version_at_phase1_start_time(0), - _phase1_replies_received(0), - _remove_candidate_set(), + _remove_candidates(), _replica_info(), _max_documents_removed(0), _is_done(false) @@ -50,7 +49,11 @@ bool GarbageCollectionOperation::all_involved_nodes_support_two_phase_gc() const } std::vector<spi::IdAndTimestamp> GarbageCollectionOperation::compile_phase_two_send_set() const { - std::vector<spi::IdAndTimestamp> docs_to_remove(_remove_candidate_set.begin(), _remove_candidate_set.end()); + std::vector<spi::IdAndTimestamp> docs_to_remove; + docs_to_remove.reserve(_remove_candidates.size()); + for (const auto& cand : _remove_candidates) { + docs_to_remove.emplace_back(cand.first, cand.second); + } // Use timestamp order to provide test determinism and allow for backend linear merging (if needed). // Tie-break on GID upon collisions (which technically should never happen...!) auto ts_then_gid_order = [](const spi::IdAndTimestamp& lhs, const spi::IdAndTimestamp& rhs) noexcept { @@ -155,36 +158,24 @@ void GarbageCollectionOperation::handle_ok_legacy_reply(uint16_t from_node, cons update_replica_response_info_from_reply(from_node, reply); } -GarbageCollectionOperation::RemoveCandidateSet -GarbageCollectionOperation::steal_selection_matches_as_set(api::RemoveLocationReply& reply) { +GarbageCollectionOperation::RemoveCandidates +GarbageCollectionOperation::steal_selection_matches_as_candidates(api::RemoveLocationReply& reply) { auto candidates = reply.steal_selection_matches(); - RemoveCandidateSet set; - set.resize(candidates.size()); + RemoveCandidates as_map; + as_map.resize(candidates.size()); for (auto& cand : candidates) { - set.insert(std::move(cand)); + as_map.insert(std::make_pair(std::move(cand.id), cand.timestamp)); } - return set; + return as_map; } void GarbageCollectionOperation::handle_ok_phase1_reply(api::RemoveLocationReply& reply) { assert(reply.documents_removed() == 0); - if (_phase1_replies_received == 0) { - // Establish baseline candidate set. Since we require an intersection between all - // sets, the number of candidates can never be _greater_ than that of the first reply. - _remove_candidate_set = steal_selection_matches_as_set(reply); - } else if (!_remove_candidate_set.empty()) { - auto their_set = steal_selection_matches_as_set(reply); - std::vector<spi::IdAndTimestamp> to_remove; - for (auto& our_cand : _remove_candidate_set) { - if (!their_set.contains(our_cand)) { - to_remove.emplace_back(our_cand); - } - } - for (auto& rm_entry : to_remove) { - _remove_candidate_set.erase(rm_entry); - } + auto their_matches = steal_selection_matches_as_candidates(reply); + for (auto& new_cand : their_matches) { + auto& maybe_existing_ts = _remove_candidates[new_cand.first]; + maybe_existing_ts = std::max(new_cand.second, maybe_existing_ts); } - ++_phase1_replies_received; } void GarbageCollectionOperation::handle_ok_phase2_reply(uint16_t from_node, const api::RemoveLocationReply& reply) { @@ -220,30 +211,30 @@ void GarbageCollectionOperation::on_metadata_read_phase_done(DistributorStripeMe mark_operation_complete(); return; } - std::vector<spi::IdAndTimestamp> already_pending_write; - for (auto& cand : _remove_candidate_set) { - auto maybe_seq_token = sender.operation_sequencer().try_acquire(getBucket().getBucketSpace(), cand.id); + std::vector<document::DocumentId> already_pending_write; + for (auto& cand : _remove_candidates) { + auto maybe_seq_token = sender.operation_sequencer().try_acquire(getBucket().getBucketSpace(), cand.first); if (maybe_seq_token.valid()) { _gc_write_locks.emplace_back(std::move(maybe_seq_token)); LOG(spam, "GC(%s): acquired write lock for '%s'; adding to GC set", - getBucket().toString().c_str(), cand.id.toString().c_str()); + getBucket().toString().c_str(), cand.first.toString().c_str()); } else { - already_pending_write.emplace_back(cand); + already_pending_write.emplace_back(cand.first); LOG(spam, "GC(%s): failed to acquire write lock for '%s'; not including in GC set", - getBucket().toString().c_str(), cand.id.toString().c_str()); + getBucket().toString().c_str(), cand.first.toString().c_str()); } } for (auto& rm_entry : already_pending_write) { - _remove_candidate_set.erase(rm_entry); + _remove_candidates.erase(rm_entry); } - if (_remove_candidate_set.empty()) { + if (_remove_candidates.empty()) { update_last_gc_timestamp_in_db(); // Nothing to remove now, try again later. mark_operation_complete(); return; } LOG(debug, "GC(%s): Sending phase 2 GC with %zu entries (with acquired write locks). " "%zu documents had pending writes and could not be GCd at this time", - getBucket().toString().c_str(), _remove_candidate_set.size(), already_pending_write.size()); + getBucket().toString().c_str(), _remove_candidates.size(), already_pending_write.size()); transition_to(Phase::WriteRemovesPhase); send_current_phase_remove_locations(sender); } diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h index adbbd210877..25308b0fb4b 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h +++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h @@ -2,11 +2,12 @@ #pragma once #include "idealstateoperation.h" +#include <vespa/document/base/documentid.h> #include <vespa/storage/bucketdb/bucketcopy.h> #include <vespa/storage/distributor/messagetracker.h> #include <vespa/storage/distributor/operation_sequencer.h> #include <vespa/persistence/spi/id_and_timestamp.h> -#include <vespa/vespalib/stllike/hash_set.h> +#include <vespa/vespalib/stllike/hash_map.h> #include <vector> namespace storage::distributor { @@ -41,18 +42,22 @@ private: static const char* to_string(Phase phase) noexcept; - using RemoveCandidateSet = vespalib::hash_set<spi::IdAndTimestamp, spi::IdAndTimestamp::hash>; + struct DocIdHasher { + size_t operator()(const document::DocumentId& id) const noexcept { + return document::GlobalId::hash()(id.getGlobalId()); + } + }; + using RemoveCandidates = vespalib::hash_map<document::DocumentId, spi::Timestamp, DocIdHasher>; Phase _phase; uint32_t _cluster_state_version_at_phase1_start_time; - uint32_t _phase1_replies_received; - RemoveCandidateSet _remove_candidate_set; + RemoveCandidates _remove_candidates; std::vector<SequencingHandle> _gc_write_locks; std::vector<BucketCopy> _replica_info; uint32_t _max_documents_removed; bool _is_done; - static RemoveCandidateSet steal_selection_matches_as_set(api::RemoveLocationReply& reply); + static RemoveCandidates steal_selection_matches_as_candidates(api::RemoveLocationReply& reply); void send_current_phase_remove_locations(DistributorStripeMessageSender& sender); std::vector<spi::IdAndTimestamp> compile_phase_two_send_set() const; |