blob: 6cd61ba21a5e0fb393c0ad1ce8f763f8e2796fb7 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
|
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
#include "idealstateoperation.h"
#include <vespa/document/base/documentid.h>
#include <vespa/persistence/spi/id_and_timestamp.h>
#include <vespa/storage/bucketdb/bucketcopy.h>
#include <vespa/storage/distributor/messagetracker.h>
#include <vespa/storage/distributor/operation_sequencer.h>
#include <vespa/vespalib/stllike/hash_map.h>
#include <vector>
namespace storage::distributor {
class PendingMessageTracker;
class GarbageCollectionOperation final : public IdealStateOperation {
public:
GarbageCollectionOperation(const ClusterContext& cluster_ctx,
const BucketAndNodes& nodes);
~GarbageCollectionOperation() override;
void onStart(DistributorStripeMessageSender& sender) override;
void onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply> &) override;
const char* getName() const noexcept override { return "garbagecollection"; };
Type getType() const noexcept override { return GARBAGE_COLLECTION; }
bool shouldBlockThisOperation(uint32_t, uint16_t, uint8_t) const override;
[[nodiscard]] bool is_two_phase() const noexcept {
return ((_phase == Phase::ReadMetadataPhase) || (_phase == Phase::WriteRemovesPhase));
}
[[nodiscard]] bool is_done() const noexcept { return _is_done; }
protected:
MessageTracker _tracker;
private:
enum class Phase {
NotStarted,
LegacySinglePhase,
ReadMetadataPhase,
WriteRemovesPhase
};
static const char* to_string(Phase phase) noexcept;
struct DocIdHasher {
size_t operator()(const document::DocumentId& id) const noexcept {
return document::GlobalId::hash()(id.getGlobalId());
}
};
using RemoveCandidates = vespalib::hash_map<document::DocumentId, spi::Timestamp, DocIdHasher>;
Phase _phase;
uint32_t _cluster_state_version_at_phase1_start_time;
RemoveCandidates _remove_candidates;
std::vector<SequencingHandle> _gc_write_locks;
std::vector<BucketCopy> _replica_info;
uint32_t _max_documents_removed;
bool _is_done;
static RemoveCandidates steal_selection_matches_as_candidates(api::RemoveLocationReply& reply);
void send_current_phase_remove_locations(DistributorStripeMessageSender& sender);
[[nodiscard]] std::vector<spi::IdAndTimestamp> compile_phase_two_send_set() const;
void handle_ok_legacy_reply(uint16_t from_node, const api::RemoveLocationReply& reply);
void handle_ok_phase1_reply(api::RemoveLocationReply& reply);
void handle_ok_phase2_reply(uint16_t from_node, const api::RemoveLocationReply& reply);
void update_replica_response_info_from_reply(uint16_t from_node, const api::RemoveLocationReply& reply);
void on_metadata_read_phase_done(DistributorStripeMessageSender& sender);
[[nodiscard]] bool may_start_write_phase() const;
[[nodiscard]] bool all_involved_nodes_support_two_phase_gc() const noexcept;
void update_last_gc_timestamp_in_db();
void merge_received_bucket_info_into_db();
void update_gc_metrics();
void mark_operation_complete();
void transition_to(Phase new_phase);
};
}
|