aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/vespa/storage/distributor/maintenance/simplemaintenancescanner.cpp
blob: ab27f2d2e43cf7c8f64904ef426b887b2c227a84 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "simplemaintenancescanner.h"
#include <vespa/storage/distributor/distributor_bucket_space.h>
#include <ostream>
#include <cassert>

namespace storage::distributor {

SimpleMaintenanceScanner::SimpleMaintenanceScanner(BucketPriorityDatabase& bucketPriorityDb,
                                                   const MaintenancePriorityGenerator& priorityGenerator,
                                                   const DistributorBucketSpaceRepo& bucketSpaceRepo)
    : _bucketPriorityDb(bucketPriorityDb),
      _priorityGenerator(priorityGenerator),
      _bucketSpaceRepo(bucketSpaceRepo),
      _bucketSpaceItr(_bucketSpaceRepo.begin()),
      _bucketCursor(),
      _pendingMaintenance()
{
}

SimpleMaintenanceScanner::~SimpleMaintenanceScanner() = default;

bool
SimpleMaintenanceScanner::GlobalMaintenanceStats::operator==(const GlobalMaintenanceStats& rhs) const noexcept
{
    return pending == rhs.pending;
}

void
SimpleMaintenanceScanner::GlobalMaintenanceStats::merge(const GlobalMaintenanceStats& rhs) noexcept
{
    for (size_t i = 0; i < pending.size(); ++i) {
        pending[i] += rhs.pending[i];
    }
}

void
SimpleMaintenanceScanner::PendingMaintenanceStats::merge(const PendingMaintenanceStats& rhs)
{
    global.merge(rhs.global);
    perNodeStats.merge(rhs.perNodeStats);
}

SimpleMaintenanceScanner::PendingMaintenanceStats::PendingMaintenanceStats() noexcept = default;
SimpleMaintenanceScanner::PendingMaintenanceStats::~PendingMaintenanceStats() = default;
SimpleMaintenanceScanner::PendingMaintenanceStats::PendingMaintenanceStats(const PendingMaintenanceStats &) = default;
SimpleMaintenanceScanner::PendingMaintenanceStats::PendingMaintenanceStats(PendingMaintenanceStats &&) noexcept = default;
SimpleMaintenanceScanner::PendingMaintenanceStats &
SimpleMaintenanceScanner::PendingMaintenanceStats::operator = (PendingMaintenanceStats &&) noexcept = default;

SimpleMaintenanceScanner::PendingMaintenanceStats
SimpleMaintenanceScanner::PendingMaintenanceStats::fetch_and_reset() {
    PendingMaintenanceStats prev = std::move(*this);
    global = GlobalMaintenanceStats();
    perNodeStats.reset(prev.perNodeStats.numNodes());
    return prev;
}

MaintenanceScanner::ScanResult
SimpleMaintenanceScanner::scanNext()
{
    for (;;) {
        if (_bucketSpaceItr == _bucketSpaceRepo.end()) {
            return ScanResult::createDone();
        }
        const auto &bucketDb(_bucketSpaceItr->second->getBucketDatabase());
        BucketDatabase::Entry entry(bucketDb.getNext(_bucketCursor));
        if (!entry.valid()) {
            ++_bucketSpaceItr;
            _bucketCursor = document::BucketId();
            continue;
        }
        countBucket(_bucketSpaceItr->first, entry.getBucketInfo());
        prioritizeBucket(document::Bucket(_bucketSpaceItr->first, entry.getBucketId()));
        _bucketCursor = entry.getBucketId();
        return ScanResult::createNotDone(_bucketSpaceItr->first, std::move(entry));
    }
}

SimpleMaintenanceScanner::PendingMaintenanceStats
SimpleMaintenanceScanner::fetch_and_reset()
{
    _bucketCursor = document::BucketId();
    _bucketSpaceItr = _bucketSpaceRepo.begin();
    return _pendingMaintenance.fetch_and_reset();
}

void
SimpleMaintenanceScanner::countBucket(document::BucketSpace bucketSpace, const BucketInfo &info)
{
    NodeMaintenanceStatsTracker &perNodeStats = _pendingMaintenance.perNodeStats;
    uint32_t nodeCount = info.getNodeCount();
    for (uint32_t i = 0; i < nodeCount; ++i) {
        const BucketCopy &node = info.getNodeRef(i);
        perNodeStats.incTotal(node.getNode(), bucketSpace);
    }
}

void
SimpleMaintenanceScanner::prioritizeBucket(const document::Bucket &bucket)
{
    MaintenancePriorityAndType pri(_priorityGenerator.prioritize(bucket, _pendingMaintenance.perNodeStats));
    if (pri.requiresMaintenance()) {
        _bucketPriorityDb.setPriority(PrioritizedBucket(bucket, pri.getPriority().getPriority()));
        assert(pri.getType() != MaintenanceOperation::OPERATION_COUNT);
        ++_pendingMaintenance.global.pending[pri.getType()];
    }
}

std::ostream&
operator<<(std::ostream& os, const SimpleMaintenanceScanner::GlobalMaintenanceStats& stats)
{
    using MO = MaintenanceOperation;
    os << "delete bucket: "        << stats.pending[MO::DELETE_BUCKET]
       << ", merge bucket: "       << stats.pending[MO::MERGE_BUCKET]
       << ", split bucket: "       << stats.pending[MO::SPLIT_BUCKET]
       << ", join bucket: "        << stats.pending[MO::JOIN_BUCKET]
       << ", set bucket state: "   << stats.pending[MO::SET_BUCKET_STATE]
       << ", garbage collection: " << stats.pending[MO::GARBAGE_COLLECTION];
    return os;
}

}