aboutsummaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2022-09-06 14:19:10 +0000
committerTor Brede Vekterli <vekterli@yahooinc.com>2022-09-06 14:36:11 +0000
commitc4701a64fa95c00e4133a29fadc84a0771b34831 (patch)
tree9c4932135e93bcaeb751ab8f41e5d1974d82c04d /storage
parent1d5273dfa4da1f625fe8456030288ef5eac634cf (diff)
Make two-phase GC work for parent-child with subset of replicas indexed
The previous iteration of GC 1st phase candidate set computation required _all_ replicas to agree that a particular document should be removed for it to be passed on to the second phase. I.e. the intersection of all nodes' document sets. This does not work as expected when the GC expression references imported fields _and_ `searchable-copies` is less than `redundancy`, as the required index structures are not present across all replicas. The result was that eligible documents were never removed. This commit changes the candidate set semantics to instead use a union of document IDs, using the maximum observed timestamp in the case of conflicts for the same ID. This mirrors the end result of the legacy behavior, but does not require merging in order to propagate tombstones from the indexed replicas to those without. It also greatly simplifies the candidate computation code.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/garbagecollectiontest.cpp58
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp61
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h15
3 files changed, 46 insertions, 88 deletions
diff --git a/storage/src/tests/distributor/garbagecollectiontest.cpp b/storage/src/tests/distributor/garbagecollectiontest.cpp
index 4f9fd25098b..1a104727f43 100644
--- a/storage/src/tests/distributor/garbagecollectiontest.cpp
+++ b/storage/src/tests/distributor/garbagecollectiontest.cpp
@@ -26,7 +26,7 @@ struct GarbageCollectionOperationTest : Test, DistributorStripeTestUtil {
spi::IdAndTimestamp _e2;
spi::IdAndTimestamp _e3;
spi::IdAndTimestamp _e4;
-
+ spi::IdAndTimestamp _e5;
GarbageCollectionOperationTest()
: _bucket_id(16, 1),
@@ -35,7 +35,8 @@ struct GarbageCollectionOperationTest : Test, DistributorStripeTestUtil {
_e1(DocumentId("id:foo:bar::doc-1"), spi::Timestamp(100)),
_e2(DocumentId("id:foo:bar::doc-2"), spi::Timestamp(200)),
_e3(DocumentId("id:foo:bar::doc-3"), spi::Timestamp(300)),
- _e4(DocumentId("id:foo:bar::doc-4"), spi::Timestamp(400))
+ _e4(DocumentId("id:foo:bar::doc-4"), spi::Timestamp(400)),
+ _e5(DocumentId("id:foo:bar::doc-4"), spi::Timestamp(500)) // Same as e4 but with higher timestamp
{}
void SetUp() override {
@@ -225,16 +226,16 @@ TEST_F(GarbageCollectionOperationTest, first_phase_sends_enumerate_only_remove_l
}
}
-TEST_F(GarbageCollectionOperationTest, second_phase_sends_intersection_of_returned_entries_with_feed_pri) {
+TEST_F(GarbageCollectionOperationTest, second_phase_sends_highest_timestamped_union_of_returned_entries_with_feed_pri) {
enable_two_phase_gc();
auto op = create_op();
op->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ(2, _sender.commands().size());
auto r1 = make_remove_location_reply(*_sender.command(0));
- r1->set_selection_matches({_e1, _e2, _e3});
+ r1->set_selection_matches({_e1, _e2, _e3, _e5});
auto r2 = make_remove_location_reply(*_sender.command(1));
- r2->set_selection_matches({_e2, _e3, _e4}); // e2, e3 in common with r1
+ r2->set_selection_matches({_e2, _e3, _e4});
_sender.commands().clear();
op->receive(_sender, r1);
@@ -242,7 +243,8 @@ TEST_F(GarbageCollectionOperationTest, second_phase_sends_intersection_of_return
op->receive(_sender, r2);
ASSERT_EQ(2u, _sender.commands().size()); // Phase 2 sent
- std::vector<spi::IdAndTimestamp> expected({_e2, _e3});
+ // e5 is same doc as e4, but at a higher timestamp; only e5 entry should be included.
+ std::vector<spi::IdAndTimestamp> expected({_e1, _e2, _e3, _e5});
for (int i : {0, 1}) {
auto cmd = as_remove_location_command(_sender.command(i));
EXPECT_FALSE(cmd->only_enumerate_docs());
@@ -267,47 +269,6 @@ TEST_F(GarbageCollectionOperationTest, no_second_phase_if_first_phase_has_no_res
EXPECT_NO_FATAL_FAILURE(assert_gc_op_completed_ok_without_second_phase(*op));
}
-TEST_F(GarbageCollectionOperationTest, no_second_phase_if_first_phase_has_results_but_intersection_is_empty) {
- enable_two_phase_gc();
- auto op = create_op();
- op->start(_sender, framework::MilliSecTime(0));
- ASSERT_EQ(2, _sender.commands().size());
-
- // No docs in common
- auto r1 = make_remove_location_reply(*_sender.command(0));
- r1->set_selection_matches({_e1});
- auto r2 = make_remove_location_reply(*_sender.command(1));
- r2->set_selection_matches({_e2});
-
- _sender.commands().clear();
- op->receive(_sender, r1);
- op->receive(_sender, r2);
-
- EXPECT_NO_FATAL_FAILURE(assert_gc_op_completed_ok_without_second_phase(*op));
-}
-
-// We explicitly test the case where the first reply has an empty result set since we internally
-// establish the baseline candidate set from the first reply. This test case leaks some internal
-// implementation details, but such is life.
-TEST_F(GarbageCollectionOperationTest, no_second_phase_if_first_phase_intersection_empty_first_reply_is_empty_case) {
- enable_two_phase_gc();
- auto op = create_op();
- op->start(_sender, framework::MilliSecTime(0));
- ASSERT_EQ(2, _sender.commands().size());
-
- auto r1 = make_remove_location_reply(*_sender.command(0));
- r1->set_selection_matches({});
- auto r2 = make_remove_location_reply(*_sender.command(1));
- r2->set_selection_matches({_e1, _e2, _e3, _e4});
-
- _sender.commands().clear();
- op->receive(_sender, r1);
- op->receive(_sender, r2);
-
- EXPECT_NO_FATAL_FAILURE(assert_gc_op_completed_ok_without_second_phase(*op));
-}
-
-
TEST_F(GarbageCollectionOperationTest, db_metrics_and_timestamp_are_updated_on_second_phase_completion) {
enable_two_phase_gc();
auto op = create_op();
@@ -317,7 +278,7 @@ TEST_F(GarbageCollectionOperationTest, db_metrics_and_timestamp_are_updated_on_s
auto r1 = make_remove_location_reply(*_sender.command(0));
r1->set_selection_matches({_e1, _e2, _e3});
auto r2 = make_remove_location_reply(*_sender.command(1));
- r2->set_selection_matches({_e2, _e3, _e4}); // e2, e3 in common with r1
+ r2->set_selection_matches({_e2, _e3, _e4});
_sender.commands().clear();
op->receive(_sender, r1);
@@ -421,6 +382,7 @@ TEST_F(GarbageCollectionOperationTest, document_level_write_locks_are_checked_an
_sender.commands().clear();
op->receive(_sender, r1);
op->receive(_sender, r2);
+ ASSERT_EQ(2, _sender.commands().size());
// Locks on e1 and e3 are held while GC removes are sent
auto e1_lock = _operation_sequencer.try_acquire(FixedBucketSpaces::default_space(), _e1.id);
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp
index 3e82b6b10a7..d3e4e49c193 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp
@@ -7,7 +7,7 @@
#include <vespa/storage/distributor/distributor_bucket_space.h>
#include <vespa/storage/distributor/node_supported_features_repo.h>
#include <vespa/storageapi/message/removelocation.h>
-#include <vespa/vespalib/stllike/hash_set.hpp>
+#include <vespa/vespalib/stllike/hash_map.hpp>
#include <algorithm>
#include <vespa/log/log.h>
@@ -20,8 +20,7 @@ GarbageCollectionOperation::GarbageCollectionOperation(const ClusterContext& clu
_tracker(cluster_ctx),
_phase(Phase::NotStarted),
_cluster_state_version_at_phase1_start_time(0),
- _phase1_replies_received(0),
- _remove_candidate_set(),
+ _remove_candidates(),
_replica_info(),
_max_documents_removed(0),
_is_done(false)
@@ -50,7 +49,11 @@ bool GarbageCollectionOperation::all_involved_nodes_support_two_phase_gc() const
}
std::vector<spi::IdAndTimestamp> GarbageCollectionOperation::compile_phase_two_send_set() const {
- std::vector<spi::IdAndTimestamp> docs_to_remove(_remove_candidate_set.begin(), _remove_candidate_set.end());
+ std::vector<spi::IdAndTimestamp> docs_to_remove;
+ docs_to_remove.reserve(_remove_candidates.size());
+ for (const auto& cand : _remove_candidates) {
+ docs_to_remove.emplace_back(cand.first, cand.second);
+ }
// Use timestamp order to provide test determinism and allow for backend linear merging (if needed).
// Tie-break on GID upon collisions (which technically should never happen...!)
auto ts_then_gid_order = [](const spi::IdAndTimestamp& lhs, const spi::IdAndTimestamp& rhs) noexcept {
@@ -155,36 +158,24 @@ void GarbageCollectionOperation::handle_ok_legacy_reply(uint16_t from_node, cons
update_replica_response_info_from_reply(from_node, reply);
}
-GarbageCollectionOperation::RemoveCandidateSet
-GarbageCollectionOperation::steal_selection_matches_as_set(api::RemoveLocationReply& reply) {
+GarbageCollectionOperation::RemoveCandidates
+GarbageCollectionOperation::steal_selection_matches_as_candidates(api::RemoveLocationReply& reply) {
auto candidates = reply.steal_selection_matches();
- RemoveCandidateSet set;
- set.resize(candidates.size());
+ RemoveCandidates as_map;
+ as_map.resize(candidates.size());
for (auto& cand : candidates) {
- set.insert(std::move(cand));
+ as_map.insert(std::make_pair(std::move(cand.id), cand.timestamp));
}
- return set;
+ return as_map;
}
void GarbageCollectionOperation::handle_ok_phase1_reply(api::RemoveLocationReply& reply) {
assert(reply.documents_removed() == 0);
- if (_phase1_replies_received == 0) {
- // Establish baseline candidate set. Since we require an intersection between all
- // sets, the number of candidates can never be _greater_ than that of the first reply.
- _remove_candidate_set = steal_selection_matches_as_set(reply);
- } else if (!_remove_candidate_set.empty()) {
- auto their_set = steal_selection_matches_as_set(reply);
- std::vector<spi::IdAndTimestamp> to_remove;
- for (auto& our_cand : _remove_candidate_set) {
- if (!their_set.contains(our_cand)) {
- to_remove.emplace_back(our_cand);
- }
- }
- for (auto& rm_entry : to_remove) {
- _remove_candidate_set.erase(rm_entry);
- }
+ auto their_matches = steal_selection_matches_as_candidates(reply);
+ for (auto& new_cand : their_matches) {
+ auto& maybe_existing_ts = _remove_candidates[new_cand.first];
+ maybe_existing_ts = std::max(new_cand.second, maybe_existing_ts);
}
- ++_phase1_replies_received;
}
void GarbageCollectionOperation::handle_ok_phase2_reply(uint16_t from_node, const api::RemoveLocationReply& reply) {
@@ -220,30 +211,30 @@ void GarbageCollectionOperation::on_metadata_read_phase_done(DistributorStripeMe
mark_operation_complete();
return;
}
- std::vector<spi::IdAndTimestamp> already_pending_write;
- for (auto& cand : _remove_candidate_set) {
- auto maybe_seq_token = sender.operation_sequencer().try_acquire(getBucket().getBucketSpace(), cand.id);
+ std::vector<document::DocumentId> already_pending_write;
+ for (auto& cand : _remove_candidates) {
+ auto maybe_seq_token = sender.operation_sequencer().try_acquire(getBucket().getBucketSpace(), cand.first);
if (maybe_seq_token.valid()) {
_gc_write_locks.emplace_back(std::move(maybe_seq_token));
LOG(spam, "GC(%s): acquired write lock for '%s'; adding to GC set",
- getBucket().toString().c_str(), cand.id.toString().c_str());
+ getBucket().toString().c_str(), cand.first.toString().c_str());
} else {
- already_pending_write.emplace_back(cand);
+ already_pending_write.emplace_back(cand.first);
LOG(spam, "GC(%s): failed to acquire write lock for '%s'; not including in GC set",
- getBucket().toString().c_str(), cand.id.toString().c_str());
+ getBucket().toString().c_str(), cand.first.toString().c_str());
}
}
for (auto& rm_entry : already_pending_write) {
- _remove_candidate_set.erase(rm_entry);
+ _remove_candidates.erase(rm_entry);
}
- if (_remove_candidate_set.empty()) {
+ if (_remove_candidates.empty()) {
update_last_gc_timestamp_in_db(); // Nothing to remove now, try again later.
mark_operation_complete();
return;
}
LOG(debug, "GC(%s): Sending phase 2 GC with %zu entries (with acquired write locks). "
"%zu documents had pending writes and could not be GCd at this time",
- getBucket().toString().c_str(), _remove_candidate_set.size(), already_pending_write.size());
+ getBucket().toString().c_str(), _remove_candidates.size(), already_pending_write.size());
transition_to(Phase::WriteRemovesPhase);
send_current_phase_remove_locations(sender);
}
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h
index adbbd210877..25308b0fb4b 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h
@@ -2,11 +2,12 @@
#pragma once
#include "idealstateoperation.h"
+#include <vespa/document/base/documentid.h>
#include <vespa/storage/bucketdb/bucketcopy.h>
#include <vespa/storage/distributor/messagetracker.h>
#include <vespa/storage/distributor/operation_sequencer.h>
#include <vespa/persistence/spi/id_and_timestamp.h>
-#include <vespa/vespalib/stllike/hash_set.h>
+#include <vespa/vespalib/stllike/hash_map.h>
#include <vector>
namespace storage::distributor {
@@ -41,18 +42,22 @@ private:
static const char* to_string(Phase phase) noexcept;
- using RemoveCandidateSet = vespalib::hash_set<spi::IdAndTimestamp, spi::IdAndTimestamp::hash>;
+ struct DocIdHasher {
+ size_t operator()(const document::DocumentId& id) const noexcept {
+ return document::GlobalId::hash()(id.getGlobalId());
+ }
+ };
+ using RemoveCandidates = vespalib::hash_map<document::DocumentId, spi::Timestamp, DocIdHasher>;
Phase _phase;
uint32_t _cluster_state_version_at_phase1_start_time;
- uint32_t _phase1_replies_received;
- RemoveCandidateSet _remove_candidate_set;
+ RemoveCandidates _remove_candidates;
std::vector<SequencingHandle> _gc_write_locks;
std::vector<BucketCopy> _replica_info;
uint32_t _max_documents_removed;
bool _is_done;
- static RemoveCandidateSet steal_selection_matches_as_set(api::RemoveLocationReply& reply);
+ static RemoveCandidates steal_selection_matches_as_candidates(api::RemoveLocationReply& reply);
void send_current_phase_remove_locations(DistributorStripeMessageSender& sender);
std::vector<spi::IdAndTimestamp> compile_phase_two_send_set() const;