aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h
blob: 6cd61ba21a5e0fb393c0ad1ce8f763f8e2796fb7 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once

#include "idealstateoperation.h"
#include <vespa/document/base/documentid.h>
#include <vespa/persistence/spi/id_and_timestamp.h>
#include <vespa/storage/bucketdb/bucketcopy.h>
#include <vespa/storage/distributor/messagetracker.h>
#include <vespa/storage/distributor/operation_sequencer.h>
#include <vespa/vespalib/stllike/hash_map.h>
#include <vector>

namespace storage::distributor {

class PendingMessageTracker;

class GarbageCollectionOperation final : public IdealStateOperation {
public:
    GarbageCollectionOperation(const ClusterContext& cluster_ctx,
                               const BucketAndNodes& nodes);
    ~GarbageCollectionOperation() override;

    void onStart(DistributorStripeMessageSender& sender) override;
    void onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply> &) override;
    const char* getName() const noexcept override { return "garbagecollection"; };
    Type getType() const noexcept override { return GARBAGE_COLLECTION; }
    bool shouldBlockThisOperation(uint32_t, uint16_t, uint8_t) const override;
    [[nodiscard]] bool is_two_phase() const noexcept {
        return ((_phase == Phase::ReadMetadataPhase) || (_phase == Phase::WriteRemovesPhase));
    }
    [[nodiscard]] bool is_done() const noexcept { return _is_done; }

protected:
    MessageTracker _tracker;
private:
    enum class Phase {
        NotStarted,
        LegacySinglePhase,
        ReadMetadataPhase,
        WriteRemovesPhase
    };

    static const char* to_string(Phase phase) noexcept;

    struct DocIdHasher {
        size_t operator()(const document::DocumentId& id) const noexcept {
            return document::GlobalId::hash()(id.getGlobalId());
        }
    };
    using RemoveCandidates = vespalib::hash_map<document::DocumentId, spi::Timestamp, DocIdHasher>;

    Phase                         _phase;
    uint32_t                      _cluster_state_version_at_phase1_start_time;
    RemoveCandidates              _remove_candidates;
    std::vector<SequencingHandle> _gc_write_locks;
    std::vector<BucketCopy>       _replica_info;
    uint32_t                      _max_documents_removed;
    bool                          _is_done;

    static RemoveCandidates steal_selection_matches_as_candidates(api::RemoveLocationReply& reply);

    void send_current_phase_remove_locations(DistributorStripeMessageSender& sender);
    [[nodiscard]] std::vector<spi::IdAndTimestamp> compile_phase_two_send_set() const;

    void handle_ok_legacy_reply(uint16_t from_node, const api::RemoveLocationReply& reply);
    void handle_ok_phase1_reply(api::RemoveLocationReply& reply);
    void handle_ok_phase2_reply(uint16_t from_node, const api::RemoveLocationReply& reply);
    void update_replica_response_info_from_reply(uint16_t from_node, const api::RemoveLocationReply& reply);
    void on_metadata_read_phase_done(DistributorStripeMessageSender& sender);
    [[nodiscard]] bool may_start_write_phase() const;
    [[nodiscard]] bool all_involved_nodes_support_two_phase_gc() const noexcept;
    void update_last_gc_timestamp_in_db();
    void merge_received_bucket_info_into_db();
    void update_gc_metrics();
    void mark_operation_complete();
    void transition_to(Phase new_phase);
};

}