aboutsummaryrefslogtreecommitdiffstats
path: root/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp
blob: 27f735379d97241914b70a0234902bb03d33ad19 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.

#include "lid_space_compaction_job.h"
#include "i_document_scan_iterator.h"
#include "i_lid_space_compaction_handler.h"
#include "i_operation_storer.h"
#include "i_disk_mem_usage_notifier.h"
#include "iclusterstatechangednotifier.h"
#include "remove_operations_rate_tracker.h"
#include <vespa/searchcore/proton/feedoperation/compact_lid_space_operation.h>
#include <vespa/searchcore/proton/feedoperation/moveoperation.h>
#include <vespa/searchcore/proton/common/eventlogger.h>
#include <vespa/searchcorespi/index/i_thread_service.h>
#include <vespa/persistence/spi/bucket_tasks.h>
#include <vespa/document/fieldvalue/document.h>
#include <vespa/vespalib/util/destructor_callbacks.h>
#include <vespa/vespalib/util/lambdatask.h>
#include <vespa/vespalib/util/gate.h>
#include <cassert>

#include <vespa/log/log.h>
LOG_SETUP(".proton.server.lidspace.compactionjob");

using search::DocumentMetaData;
using search::LidUsageStats;
using storage::spi::makeBucketTask;
using storage::spi::Bucket;
using vespalib::RetainGuard;
using vespalib::makeLambdaTask;

namespace proton::lidspace {

namespace {

bool
isSameDocument(const search::DocumentMetaData &a, const search::DocumentMetaData &b) {
    return (a.lid == b.lid) &&
           (a.bucketId == b.bucketId) &&
           (a.gid == b.gid) &&
           (a.timestamp ==
            b.timestamp); // Timestamp check can be removed once logic has proved itself in large scale.
}

}

class CompactionJob::MoveTask : public storage::spi::BucketTask {
public:
    MoveTask(std::shared_ptr<CompactionJob> job, const search::DocumentMetaData & meta, IDestructorCallback::SP opsTracker)
        : _job(std::move(job)),
          _meta(meta),
          _opsTracker(std::move(opsTracker))
    { }
    void run(const Bucket & bucket, IDestructorCallback::SP onDone) override {
        assert(bucket.getBucketId() == _meta.bucketId);
        using DoneContext = vespalib::KeepAlive<std::pair<IDestructorCallback::SP, IDestructorCallback::SP>>;
        CompactionJob::moveDocument(std::move(_job), _meta,
                                    std::make_shared<DoneContext>(std::make_pair(std::move(_opsTracker), std::move(onDone))));
    }
    void fail(const Bucket & bucket) override {
        assert(bucket.getBucketId() == _meta.bucketId);
        auto & master = _job->_master;
        if (_job->stopped()) return;
        master.execute(makeLambdaTask([job=std::move(_job)] { job->_scanItr.reset(); }));
    }
private:
    std::shared_ptr<CompactionJob> _job;
    const search::DocumentMetaData _meta;
    IDestructorCallback::SP        _opsTracker;
};

bool
CompactionJob::scanDocuments(const LidUsageStats &stats)
{
    if (_scanItr->valid()) {
        DocumentMetaData document = getNextDocument(stats);
        if (document.valid()) {
            Bucket metaBucket(document::Bucket(_bucketSpace, document.bucketId));
            _bucketExecutor.execute(metaBucket, std::make_unique<MoveTask>(shared_from_this(), document, getLimiter().beginOperation()));
            if (isBlocked(BlockedReason::OUTSTANDING_OPS)) {
                return true;
            }
        }
    }
    return false;
}

void
CompactionJob::moveDocument(std::shared_ptr<CompactionJob> job, const search::DocumentMetaData & metaThen,
                            std::shared_ptr<IDestructorCallback> context)
{
    if (job->stopped()) return; //TODO Remove once lidtracker is no longer in use.
    // The real lid must be sampled in the master thread.
    //TODO remove target lid from createMoveOperation interface
    auto op = job->_handler->createMoveOperation(metaThen, 0);
    if (!op || !op->getDocument()) return;
    // Early detection and force md5 calculation outside of master thread
    if (metaThen.gid != op->getDocument()->getId().getGlobalId()) return;

    auto & master = job->_master;
    if (job->stopped()) return;
    master.execute(makeLambdaTask([self=std::move(job), meta=metaThen, moveOp=std::move(op), onDone=std::move(context)]() mutable {
        if (self->stopped()) return;
        self->completeMove(meta, std::move(moveOp), std::move(onDone));
    }));
}

void
CompactionJob::completeMove(const search::DocumentMetaData & metaThen, std::unique_ptr<MoveOperation> moveOp,
                            std::shared_ptr<IDestructorCallback> onDone)
{
    // Reread meta data as document might have been altered after move was initiated
    // If so it will fail the timestamp sanity check later on.
    search::DocumentMetaData metaNow = _handler->getMetaData(metaThen.lid);
    // This should be impossible and should probably be an assert
    if ( ! isSameDocument(metaThen, metaNow)) return;
    if (metaNow.gid != moveOp->getDocument()->getId().getGlobalId()) return;

    uint32_t lowestLid = _handler->getLidStatus().getLowestFreeLid();
    if (lowestLid >= metaNow.lid) return;
    moveOp->setTargetLid(lowestLid);
    _opStorer.appendOperation(*moveOp, onDone);
    _handler->handleMove(*moveOp, std::move(onDone));
}

CompactionJob::CompactionJob(const DocumentDBLidSpaceCompactionConfig &config,
                             RetainGuard dbRetainer,
                             std::shared_ptr<ILidSpaceCompactionHandler> handler,
                             IOperationStorer &opStorer,
                             IThreadService & master,
                             BucketExecutor & bucketExecutor,
                             IDiskMemUsageNotifier &diskMemUsageNotifier,
                             const BlockableMaintenanceJobConfig &blockableConfig,
                             IClusterStateChangedNotifier &clusterStateChangedNotifier,
                             bool nodeRetired,
                             document::BucketSpace bucketSpace)
    : BlockableMaintenanceJob("lid_space_compaction." + handler->getName(),
                              config.getDelay(), config.getInterval(), blockableConfig),
      IDiskMemUsageListener(),
      IClusterStateChangedHandler(),
      std::enable_shared_from_this<CompactionJob>(),
      _cfg(config),
      _handler(std::move(handler)),
      _opStorer(opStorer),
      _scanItr(),
      _diskMemUsageNotifier(diskMemUsageNotifier),
      _clusterStateChangedNotifier(clusterStateChangedNotifier),
      _ops_rate_tracker(std::make_shared<RemoveOperationsRateTracker>(config.get_remove_batch_block_rate(),
                                                                      config.get_remove_block_rate())),
      _is_disabled(false),
      _shouldCompactLidSpace(false),
      _master(master),
      _bucketExecutor(bucketExecutor),
      _dbRetainer(std::move(dbRetainer)),
      _bucketSpace(bucketSpace)
{
    _diskMemUsageNotifier.addDiskMemUsageListener(this);
    _clusterStateChangedNotifier.addClusterStateChangedHandler(this);
    if (nodeRetired) {
        setBlocked(BlockedReason::CLUSTER_STATE);
    }
    _handler->set_operation_listener(_ops_rate_tracker);
}

CompactionJob::~CompactionJob() {
    _clusterStateChangedNotifier.removeClusterStateChangedHandler(this);
    _diskMemUsageNotifier.removeDiskMemUsageListener(this);
}

std::shared_ptr<CompactionJob>
CompactionJob::create(const DocumentDBLidSpaceCompactionConfig &config,
                      RetainGuard dbRetainer,
                      std::shared_ptr<ILidSpaceCompactionHandler> handler,
                      IOperationStorer &opStorer,
                      IThreadService & master,
                      BucketExecutor & bucketExecutor,
                      IDiskMemUsageNotifier &diskMemUsageNotifier,
                      const BlockableMaintenanceJobConfig &blockableConfig,
                      IClusterStateChangedNotifier &clusterStateChangedNotifier,
                      bool nodeRetired,
                      document::BucketSpace bucketSpace)
{
    return std::shared_ptr<CompactionJob>(
            new CompactionJob(config, std::move(dbRetainer), std::move(handler), opStorer, master, bucketExecutor,
                              diskMemUsageNotifier, blockableConfig, clusterStateChangedNotifier, nodeRetired, bucketSpace),
            [&master](auto job) {
                auto failed = master.execute(makeLambdaTask([job]() { delete job; }));
                assert(!failed);
            });
}

DocumentMetaData
CompactionJob::getNextDocument(const LidUsageStats &stats)
{
    return _scanItr->next(std::max(stats.getLowestFreeLid(), stats.getUsedLids()));
}

bool
CompactionJob::run()
{
    if (isBlocked()) {
        return true; // indicate work is done since no work can be done
    }
    if (remove_batch_is_ongoing()) {
        // Note that we don't set the job as blocked as the decision to un-block it is not driven externally.
        if (!_is_disabled) {
            LOG(info, "%s: Lid space compaction is disabled while remove batch (delete buckets) is ongoing",
                _handler->getName().c_str());
            _is_disabled = true;
        }
        return true;
    }
    if (remove_is_ongoing()) {
        // Note that we don't set the job as blocked as the decision to un-block it is not driven externally.
        if (!_is_disabled) {
            LOG(info, "%s: Lid space compaction is disabled while remove operations are ongoing",
                _handler->getName().c_str());
            _is_disabled = true;
        }
        return true;
    }
    if (_is_disabled) {
        LOG(info, "%s: Lid space compaction is re-enabled as remove operations are no longer ongoing",
            _handler->getName().c_str());
        _is_disabled = false;
    }

    if (_scanItr && !_scanItr->valid()) {
        bool numPending = getLimiter().numPending();
        if (numPending > 0) {
            // We must wait to decide if a rescan is necessary until all operations are completed
            return false;
        }
        LidUsageStats stats = _handler->getLidStatus();
        if (shouldRestartScanDocuments(stats)) {
            _scanItr = _handler->getIterator();
        } else {
            _scanItr = IDocumentScanIterator::UP();
            _shouldCompactLidSpace = true;
            return false;
        }
    }

    LidUsageStats stats = _handler->getLidStatus();
    if (_scanItr) {
        return scanDocuments(stats);
    } else if (_shouldCompactLidSpace) {
        compactLidSpace(stats);
    } else if (hasTooMuchLidBloat(stats)) {
        assert(!_scanItr);
        _scanItr = _handler->getIterator();
        return scanDocuments(stats);
    }
    return true;
}

bool
CompactionJob::remove_batch_is_ongoing() const
{
    return _ops_rate_tracker->remove_batch_above_threshold();
}

bool
CompactionJob::remove_is_ongoing() const
{
    return _ops_rate_tracker->remove_above_threshold();
}

bool
CompactionJob::hasTooMuchLidBloat(const LidUsageStats &stats) const
{
    return ((stats.getLidBloat() >= _cfg.getAllowedLidBloat()) &&
            (stats.getLidBloatFactor() >= _cfg.getAllowedLidBloatFactor()) &&
            (stats.getLidLimit() > stats.getLowestFreeLid()));
}

bool
CompactionJob::shouldRestartScanDocuments(const LidUsageStats &stats) const
{
    return ((stats.getUsedLids() + _cfg.getAllowedLidBloat()) < stats.getHighestUsedLid()) &&
           (stats.getLowestFreeLid() < stats.getHighestUsedLid());
}

void
CompactionJob::compactLidSpace(const LidUsageStats &stats)
{
    uint32_t wantedLidLimit = stats.getHighestUsedLid() + 1;
    CompactLidSpaceOperation op(_handler->getSubDbId(), wantedLidLimit);
    vespalib::Gate gate;
    auto commit_result = _opStorer.appendAndCommitOperation(op, std::make_shared<vespalib::GateCallback>(gate));
    gate.await();
    _handler->handleCompactLidSpace(op, std::make_shared<vespalib::KeepAlive<decltype(commit_result)>>(std::move(commit_result)));
    EventLogger::lidSpaceCompactionComplete(_handler->getName(), wantedLidLimit);
    _shouldCompactLidSpace = false;
}

void
CompactionJob::notifyDiskMemUsage(DiskMemUsageState state)
{
    // Called by master write thread
    internalNotifyDiskMemUsage(state);
}

void
CompactionJob::notifyClusterStateChanged(const std::shared_ptr<IBucketStateCalculator> &newCalc)
{
    // Called by master write thread
    bool nodeRetired = newCalc->nodeRetired();
    if (!nodeRetired) {
        if (isBlocked(BlockedReason::CLUSTER_STATE)) {
            LOG(info, "%s: Lid space compaction is un-blocked as node is no longer retired", _handler->getName().c_str());
            unBlock(BlockedReason::CLUSTER_STATE);
        }
    } else if (!isBlocked(BlockedReason::CLUSTER_STATE)) {
        LOG(info, "%s: Lid space compaction is blocked as node is retired", _handler->getName().c_str());
        setBlocked(BlockedReason::CLUSTER_STATE);
    }
}

} // namespace proton