summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2022-09-06 21:58:22 +0200
committerGitHub <noreply@github.com>2022-09-06 21:58:22 +0200
commit343dd16027714ca43f677d3de711cf5625aac158 (patch)
treef820f1649b2356e8436732a0cce51508fb6de2ae /storage
parentc2236fbba225babc5088cc9a06638b76b2183b69 (diff)
parentc4701a64fa95c00e4133a29fadc84a0771b34831 (diff)
Merge pull request #23951 from vespa-engine/vekterli/make-gc-work-with-parent-child-with-subset-indexed
Make two-phase GC work for parent-child with subset of replicas indexed [run-systemtest]
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/garbagecollectiontest.cpp58
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp61
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h15
3 files changed, 46 insertions, 88 deletions
diff --git a/storage/src/tests/distributor/garbagecollectiontest.cpp b/storage/src/tests/distributor/garbagecollectiontest.cpp
index 4f9fd25098b..1a104727f43 100644
--- a/storage/src/tests/distributor/garbagecollectiontest.cpp
+++ b/storage/src/tests/distributor/garbagecollectiontest.cpp
@@ -26,7 +26,7 @@ struct GarbageCollectionOperationTest : Test, DistributorStripeTestUtil {
spi::IdAndTimestamp _e2;
spi::IdAndTimestamp _e3;
spi::IdAndTimestamp _e4;
-
+ spi::IdAndTimestamp _e5;
GarbageCollectionOperationTest()
: _bucket_id(16, 1),
@@ -35,7 +35,8 @@ struct GarbageCollectionOperationTest : Test, DistributorStripeTestUtil {
_e1(DocumentId("id:foo:bar::doc-1"), spi::Timestamp(100)),
_e2(DocumentId("id:foo:bar::doc-2"), spi::Timestamp(200)),
_e3(DocumentId("id:foo:bar::doc-3"), spi::Timestamp(300)),
- _e4(DocumentId("id:foo:bar::doc-4"), spi::Timestamp(400))
+ _e4(DocumentId("id:foo:bar::doc-4"), spi::Timestamp(400)),
+ _e5(DocumentId("id:foo:bar::doc-4"), spi::Timestamp(500)) // Same as e4 but with higher timestamp
{}
void SetUp() override {
@@ -225,16 +226,16 @@ TEST_F(GarbageCollectionOperationTest, first_phase_sends_enumerate_only_remove_l
}
}
-TEST_F(GarbageCollectionOperationTest, second_phase_sends_intersection_of_returned_entries_with_feed_pri) {
+TEST_F(GarbageCollectionOperationTest, second_phase_sends_highest_timestamped_union_of_returned_entries_with_feed_pri) {
enable_two_phase_gc();
auto op = create_op();
op->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ(2, _sender.commands().size());
auto r1 = make_remove_location_reply(*_sender.command(0));
- r1->set_selection_matches({_e1, _e2, _e3});
+ r1->set_selection_matches({_e1, _e2, _e3, _e5});
auto r2 = make_remove_location_reply(*_sender.command(1));
- r2->set_selection_matches({_e2, _e3, _e4}); // e2, e3 in common with r1
+ r2->set_selection_matches({_e2, _e3, _e4});
_sender.commands().clear();
op->receive(_sender, r1);
@@ -242,7 +243,8 @@ TEST_F(GarbageCollectionOperationTest, second_phase_sends_intersection_of_return
op->receive(_sender, r2);
ASSERT_EQ(2u, _sender.commands().size()); // Phase 2 sent
- std::vector<spi::IdAndTimestamp> expected({_e2, _e3});
+ // e5 is same doc as e4, but at a higher timestamp; only e5 entry should be included.
+ std::vector<spi::IdAndTimestamp> expected({_e1, _e2, _e3, _e5});
for (int i : {0, 1}) {
auto cmd = as_remove_location_command(_sender.command(i));
EXPECT_FALSE(cmd->only_enumerate_docs());
@@ -267,47 +269,6 @@ TEST_F(GarbageCollectionOperationTest, no_second_phase_if_first_phase_has_no_res
EXPECT_NO_FATAL_FAILURE(assert_gc_op_completed_ok_without_second_phase(*op));
}
-TEST_F(GarbageCollectionOperationTest, no_second_phase_if_first_phase_has_results_but_intersection_is_empty) {
- enable_two_phase_gc();
- auto op = create_op();
- op->start(_sender, framework::MilliSecTime(0));
- ASSERT_EQ(2, _sender.commands().size());
-
- // No docs in common
- auto r1 = make_remove_location_reply(*_sender.command(0));
- r1->set_selection_matches({_e1});
- auto r2 = make_remove_location_reply(*_sender.command(1));
- r2->set_selection_matches({_e2});
-
- _sender.commands().clear();
- op->receive(_sender, r1);
- op->receive(_sender, r2);
-
- EXPECT_NO_FATAL_FAILURE(assert_gc_op_completed_ok_without_second_phase(*op));
-}
-
-// We explicitly test the case where the first reply has an empty result set since we internally
-// establish the baseline candidate set from the first reply. This test case leaks some internal
-// implementation details, but such is life.
-TEST_F(GarbageCollectionOperationTest, no_second_phase_if_first_phase_intersection_empty_first_reply_is_empty_case) {
- enable_two_phase_gc();
- auto op = create_op();
- op->start(_sender, framework::MilliSecTime(0));
- ASSERT_EQ(2, _sender.commands().size());
-
- auto r1 = make_remove_location_reply(*_sender.command(0));
- r1->set_selection_matches({});
- auto r2 = make_remove_location_reply(*_sender.command(1));
- r2->set_selection_matches({_e1, _e2, _e3, _e4});
-
- _sender.commands().clear();
- op->receive(_sender, r1);
- op->receive(_sender, r2);
-
- EXPECT_NO_FATAL_FAILURE(assert_gc_op_completed_ok_without_second_phase(*op));
-}
-
-
TEST_F(GarbageCollectionOperationTest, db_metrics_and_timestamp_are_updated_on_second_phase_completion) {
enable_two_phase_gc();
auto op = create_op();
@@ -317,7 +278,7 @@ TEST_F(GarbageCollectionOperationTest, db_metrics_and_timestamp_are_updated_on_s
auto r1 = make_remove_location_reply(*_sender.command(0));
r1->set_selection_matches({_e1, _e2, _e3});
auto r2 = make_remove_location_reply(*_sender.command(1));
- r2->set_selection_matches({_e2, _e3, _e4}); // e2, e3 in common with r1
+ r2->set_selection_matches({_e2, _e3, _e4});
_sender.commands().clear();
op->receive(_sender, r1);
@@ -421,6 +382,7 @@ TEST_F(GarbageCollectionOperationTest, document_level_write_locks_are_checked_an
_sender.commands().clear();
op->receive(_sender, r1);
op->receive(_sender, r2);
+ ASSERT_EQ(2, _sender.commands().size());
// Locks on e1 and e3 are held while GC removes are sent
auto e1_lock = _operation_sequencer.try_acquire(FixedBucketSpaces::default_space(), _e1.id);
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp
index 3e82b6b10a7..d3e4e49c193 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp
@@ -7,7 +7,7 @@
#include <vespa/storage/distributor/distributor_bucket_space.h>
#include <vespa/storage/distributor/node_supported_features_repo.h>
#include <vespa/storageapi/message/removelocation.h>
-#include <vespa/vespalib/stllike/hash_set.hpp>
+#include <vespa/vespalib/stllike/hash_map.hpp>
#include <algorithm>
#include <vespa/log/log.h>
@@ -20,8 +20,7 @@ GarbageCollectionOperation::GarbageCollectionOperation(const ClusterContext& clu
_tracker(cluster_ctx),
_phase(Phase::NotStarted),
_cluster_state_version_at_phase1_start_time(0),
- _phase1_replies_received(0),
- _remove_candidate_set(),
+ _remove_candidates(),
_replica_info(),
_max_documents_removed(0),
_is_done(false)
@@ -50,7 +49,11 @@ bool GarbageCollectionOperation::all_involved_nodes_support_two_phase_gc() const
}
std::vector<spi::IdAndTimestamp> GarbageCollectionOperation::compile_phase_two_send_set() const {
- std::vector<spi::IdAndTimestamp> docs_to_remove(_remove_candidate_set.begin(), _remove_candidate_set.end());
+ std::vector<spi::IdAndTimestamp> docs_to_remove;
+ docs_to_remove.reserve(_remove_candidates.size());
+ for (const auto& cand : _remove_candidates) {
+ docs_to_remove.emplace_back(cand.first, cand.second);
+ }
// Use timestamp order to provide test determinism and allow for backend linear merging (if needed).
// Tie-break on GID upon collisions (which technically should never happen...!)
auto ts_then_gid_order = [](const spi::IdAndTimestamp& lhs, const spi::IdAndTimestamp& rhs) noexcept {
@@ -155,36 +158,24 @@ void GarbageCollectionOperation::handle_ok_legacy_reply(uint16_t from_node, cons
update_replica_response_info_from_reply(from_node, reply);
}
-GarbageCollectionOperation::RemoveCandidateSet
-GarbageCollectionOperation::steal_selection_matches_as_set(api::RemoveLocationReply& reply) {
+GarbageCollectionOperation::RemoveCandidates
+GarbageCollectionOperation::steal_selection_matches_as_candidates(api::RemoveLocationReply& reply) {
auto candidates = reply.steal_selection_matches();
- RemoveCandidateSet set;
- set.resize(candidates.size());
+ RemoveCandidates as_map;
+ as_map.resize(candidates.size());
for (auto& cand : candidates) {
- set.insert(std::move(cand));
+ as_map.insert(std::make_pair(std::move(cand.id), cand.timestamp));
}
- return set;
+ return as_map;
}
void GarbageCollectionOperation::handle_ok_phase1_reply(api::RemoveLocationReply& reply) {
assert(reply.documents_removed() == 0);
- if (_phase1_replies_received == 0) {
- // Establish baseline candidate set. Since we require an intersection between all
- // sets, the number of candidates can never be _greater_ than that of the first reply.
- _remove_candidate_set = steal_selection_matches_as_set(reply);
- } else if (!_remove_candidate_set.empty()) {
- auto their_set = steal_selection_matches_as_set(reply);
- std::vector<spi::IdAndTimestamp> to_remove;
- for (auto& our_cand : _remove_candidate_set) {
- if (!their_set.contains(our_cand)) {
- to_remove.emplace_back(our_cand);
- }
- }
- for (auto& rm_entry : to_remove) {
- _remove_candidate_set.erase(rm_entry);
- }
+ auto their_matches = steal_selection_matches_as_candidates(reply);
+ for (auto& new_cand : their_matches) {
+ auto& maybe_existing_ts = _remove_candidates[new_cand.first];
+ maybe_existing_ts = std::max(new_cand.second, maybe_existing_ts);
}
- ++_phase1_replies_received;
}
void GarbageCollectionOperation::handle_ok_phase2_reply(uint16_t from_node, const api::RemoveLocationReply& reply) {
@@ -220,30 +211,30 @@ void GarbageCollectionOperation::on_metadata_read_phase_done(DistributorStripeMe
mark_operation_complete();
return;
}
- std::vector<spi::IdAndTimestamp> already_pending_write;
- for (auto& cand : _remove_candidate_set) {
- auto maybe_seq_token = sender.operation_sequencer().try_acquire(getBucket().getBucketSpace(), cand.id);
+ std::vector<document::DocumentId> already_pending_write;
+ for (auto& cand : _remove_candidates) {
+ auto maybe_seq_token = sender.operation_sequencer().try_acquire(getBucket().getBucketSpace(), cand.first);
if (maybe_seq_token.valid()) {
_gc_write_locks.emplace_back(std::move(maybe_seq_token));
LOG(spam, "GC(%s): acquired write lock for '%s'; adding to GC set",
- getBucket().toString().c_str(), cand.id.toString().c_str());
+ getBucket().toString().c_str(), cand.first.toString().c_str());
} else {
- already_pending_write.emplace_back(cand);
+ already_pending_write.emplace_back(cand.first);
LOG(spam, "GC(%s): failed to acquire write lock for '%s'; not including in GC set",
- getBucket().toString().c_str(), cand.id.toString().c_str());
+ getBucket().toString().c_str(), cand.first.toString().c_str());
}
}
for (auto& rm_entry : already_pending_write) {
- _remove_candidate_set.erase(rm_entry);
+ _remove_candidates.erase(rm_entry);
}
- if (_remove_candidate_set.empty()) {
+ if (_remove_candidates.empty()) {
update_last_gc_timestamp_in_db(); // Nothing to remove now, try again later.
mark_operation_complete();
return;
}
LOG(debug, "GC(%s): Sending phase 2 GC with %zu entries (with acquired write locks). "
"%zu documents had pending writes and could not be GCd at this time",
- getBucket().toString().c_str(), _remove_candidate_set.size(), already_pending_write.size());
+ getBucket().toString().c_str(), _remove_candidates.size(), already_pending_write.size());
transition_to(Phase::WriteRemovesPhase);
send_current_phase_remove_locations(sender);
}
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h
index adbbd210877..25308b0fb4b 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h
@@ -2,11 +2,12 @@
#pragma once
#include "idealstateoperation.h"
+#include <vespa/document/base/documentid.h>
#include <vespa/storage/bucketdb/bucketcopy.h>
#include <vespa/storage/distributor/messagetracker.h>
#include <vespa/storage/distributor/operation_sequencer.h>
#include <vespa/persistence/spi/id_and_timestamp.h>
-#include <vespa/vespalib/stllike/hash_set.h>
+#include <vespa/vespalib/stllike/hash_map.h>
#include <vector>
namespace storage::distributor {
@@ -41,18 +42,22 @@ private:
static const char* to_string(Phase phase) noexcept;
- using RemoveCandidateSet = vespalib::hash_set<spi::IdAndTimestamp, spi::IdAndTimestamp::hash>;
+ struct DocIdHasher {
+ size_t operator()(const document::DocumentId& id) const noexcept {
+ return document::GlobalId::hash()(id.getGlobalId());
+ }
+ };
+ using RemoveCandidates = vespalib::hash_map<document::DocumentId, spi::Timestamp, DocIdHasher>;
Phase _phase;
uint32_t _cluster_state_version_at_phase1_start_time;
- uint32_t _phase1_replies_received;
- RemoveCandidateSet _remove_candidate_set;
+ RemoveCandidates _remove_candidates;
std::vector<SequencingHandle> _gc_write_locks;
std::vector<BucketCopy> _replica_info;
uint32_t _max_documents_removed;
bool _is_done;
- static RemoveCandidateSet steal_selection_matches_as_set(api::RemoveLocationReply& reply);
+ static RemoveCandidates steal_selection_matches_as_candidates(api::RemoveLocationReply& reply);
void send_current_phase_remove_locations(DistributorStripeMessageSender& sender);
std::vector<spi::IdAndTimestamp> compile_phase_two_send_set() const;