aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp
blob: fbe1c142b09a307aff17b8088d2d1b90ccf4cbf8 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.

#include "garbagecollectionoperation.h"
#include <vespa/storage/distributor/idealstatemanager.h>
#include <vespa/storage/distributor/idealstatemetricsset.h>
#include <vespa/storage/distributor/top_level_distributor.h>
#include <vespa/storage/distributor/distributor_bucket_space.h>
#include <vespa/storageapi/message/removelocation.h>

#include <vespa/log/log.h>
LOG_SETUP(".distributor.operation.idealstate.remove");

namespace storage::distributor {

GarbageCollectionOperation::GarbageCollectionOperation(const ClusterContext& cluster_ctx, const BucketAndNodes& nodes)
    : IdealStateOperation(nodes),
      _tracker(cluster_ctx),
      _replica_info(),
      _max_documents_removed(0)
{}

GarbageCollectionOperation::~GarbageCollectionOperation() = default;

void GarbageCollectionOperation::onStart(DistributorStripeMessageSender& sender) {
    BucketDatabase::Entry entry = _bucketSpace->getBucketDatabase().get(getBucketId());
    std::vector<uint16_t> nodes = entry->getNodes();

    for (auto node : nodes) {
        auto command = std::make_shared<api::RemoveLocationCommand>(
                _manager->operation_context().distributor_config().getGarbageCollectionSelection(),
                getBucket());

        command->setPriority(_priority);
        _tracker.queueCommand(command, node);
    }

    _tracker.flushQueue(sender);

    if (_tracker.finished()) {
        done();
    }
}

void
GarbageCollectionOperation::onReceive(DistributorStripeMessageSender&,
                                      const std::shared_ptr<api::StorageReply>& reply)
{
    auto* rep = dynamic_cast<api::RemoveLocationReply*>(reply.get());
    assert(rep != nullptr);

    uint16_t node = _tracker.handleReply(*rep);

    if (!rep->getResult().failed()) {
        _replica_info.emplace_back(_manager->operation_context().generate_unique_timestamp(),
                                   node, rep->getBucketInfo());
        _max_documents_removed = std::max(_max_documents_removed, rep->documents_removed());
    } else {
        _ok = false;
    }

    if (_tracker.finished()) {
        if (_ok) {
            merge_received_bucket_info_into_db();
        }
        update_gc_metrics();
        done();
    }
}

void GarbageCollectionOperation::merge_received_bucket_info_into_db() {
    // TODO avoid two separate DB ops for this. Current API currently does not make this elegant.
    _manager->operation_context().update_bucket_database(getBucket(), _replica_info);
    BucketDatabase::Entry dbentry = _bucketSpace->getBucketDatabase().get(getBucketId());
    if (dbentry.valid()) {
        dbentry->setLastGarbageCollectionTime(
                _manager->node_context().clock().getTimeInSeconds().getTime());
        _bucketSpace->getBucketDatabase().update(dbentry);
    }
}

void GarbageCollectionOperation::update_gc_metrics() {
    auto metric_base = _manager->getMetrics().operations[IdealStateOperation::GARBAGE_COLLECTION];
    auto gc_metrics = std::dynamic_pointer_cast<GcMetricSet>(metric_base);
    assert(gc_metrics);
    gc_metrics->documents_removed.inc(_max_documents_removed);
}

bool
GarbageCollectionOperation::shouldBlockThisOperation(uint32_t, uint8_t) const {
    return true;
}

}