summaryrefslogtreecommitdiffstats
path: root/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp
blob: 9c2f97dd65524e1cbdf00b59cb30460352e92e23 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.

#include "bucketmovejob.h"
#include "heart_beat_job.h"
#include "job_tracked_maintenance_job.h"
#include "lid_space_compaction_job.h"
#include "lid_space_compaction_handler.h"
#include "maintenance_jobs_injector.h"
#include "prune_session_cache_job.h"
#include "pruneremoveddocumentsjob.h"
#include "sample_attribute_usage_job.h"

using vespalib::system_clock;

namespace proton {

namespace {

IMaintenanceJob::UP
trackJob(std::shared_ptr<IJobTracker> tracker, std::shared_ptr<IMaintenanceJob> job)
{
    return std::make_unique<JobTrackedMaintenanceJob>(std::move(tracker), std::move(job));
}

void
injectLidSpaceCompactionJobs(MaintenanceController &controller,
                             const DocumentDBMaintenanceConfig &config,
                             storage::spi::BucketExecutor & bucketExecutor,
                             ILidSpaceCompactionHandler::Vector lscHandlers,
                             IOperationStorer &opStorer,
                             std::shared_ptr<IJobTracker> tracker,
                             IDiskMemUsageNotifier &diskMemUsageNotifier,
                             IClusterStateChangedNotifier &clusterStateChangedNotifier,
                             const std::shared_ptr<IBucketStateCalculator> &calc,
                             document::BucketSpace bucketSpace)
{
    for (auto &lidHandler : lscHandlers) {
        auto job = lidspace::CompactionJob::create(config.getLidSpaceCompactionConfig(), controller.retainDB(),
                                                   std::move(lidHandler), opStorer, controller.masterThread(),
                                                   bucketExecutor, diskMemUsageNotifier,config.getBlockableJobConfig(),
                                                   clusterStateChangedNotifier, calc && calc->nodeRetired(), bucketSpace);
        controller.registerJobInMasterThread(trackJob(tracker, std::move(job)));
    }
}

void
injectBucketMoveJob(MaintenanceController &controller,
                    const DocumentDBMaintenanceConfig &config,
                    storage::spi::BucketExecutor & bucketExecutor,
                    bucketdb::IBucketCreateNotifier &bucketCreateNotifier,
                    const vespalib::string &docTypeName,
                    document::BucketSpace bucketSpace,
                    IDocumentMoveHandler &moveHandler,
                    IBucketModifiedHandler &bucketModifiedHandler,
                    IClusterStateChangedNotifier &clusterStateChangedNotifier,
                    IBucketStateChangedNotifier &bucketStateChangedNotifier,
                    std::shared_ptr<IBucketStateCalculator> calc,
                    DocumentDBJobTrackers &jobTrackers,
                    IDiskMemUsageNotifier &diskMemUsageNotifier)
{
    auto bmj = BucketMoveJob::create(std::move(calc), controller.retainDB(), moveHandler, bucketModifiedHandler, controller.masterThread(),
                                     bucketExecutor, controller.getReadySubDB(), controller.getNotReadySubDB(),
                                     bucketCreateNotifier, clusterStateChangedNotifier, bucketStateChangedNotifier,
                                     diskMemUsageNotifier, config.getBlockableJobConfig(), docTypeName, bucketSpace);
    controller.registerJobInMasterThread(trackJob(jobTrackers.getBucketMove(), std::move(bmj)));
}

}

void
MaintenanceJobsInjector::injectJobs(MaintenanceController &controller,
                                    const DocumentDBMaintenanceConfig &config,
                                    storage::spi::BucketExecutor & bucketExecutor,
                                    IHeartBeatHandler &hbHandler,
                                    matching::ISessionCachePruner &scPruner,
                                    IOperationStorer &opStorer,
                                    bucketdb::IBucketCreateNotifier &bucketCreateNotifier,
                                    document::BucketSpace bucketSpace,
                                    IPruneRemovedDocumentsHandler &prdHandler,
                                    IDocumentMoveHandler &moveHandler,
                                    IBucketModifiedHandler &bucketModifiedHandler,
                                    IClusterStateChangedNotifier &clusterStateChangedNotifier,
                                    IBucketStateChangedNotifier &bucketStateChangedNotifier,
                                    const std::shared_ptr<IBucketStateCalculator> &calc,
                                    IDiskMemUsageNotifier &diskMemUsageNotifier,
                                    DocumentDBJobTrackers &jobTrackers,
                                    IAttributeManagerSP readyAttributeManager,
                                    IAttributeManagerSP notReadyAttributeManager,
                                    AttributeUsageFilter &attributeUsageFilter)
{
    controller.registerJobInMasterThread(std::make_unique<HeartBeatJob>(hbHandler, config.getHeartBeatConfig()));
    controller.registerJobInSharedExecutor(
            std::make_unique<PruneSessionCacheJob>(scPruner, config.getSessionCachePruneInterval()));

    const auto & docTypeName = controller.getDocTypeName().getName();
    const MaintenanceDocumentSubDB &mRemSubDB(controller.getRemSubDB());

    controller.registerJobInMasterThread(
            trackJob(jobTrackers.getRemovedDocumentsPrune(),
                     PruneRemovedDocumentsJob::create(config.getPruneRemovedDocumentsConfig(), controller.retainDB(),
                                                      *mRemSubDB.meta_store(), mRemSubDB.sub_db_id(), bucketSpace,
                                                      docTypeName, prdHandler, controller.masterThread(),
                                                      bucketExecutor)));


    if (!config.getLidSpaceCompactionConfig().isDisabled()) {
        ILidSpaceCompactionHandler::Vector lidSpaceCompactionHandlers;
        lidSpaceCompactionHandlers.push_back(std::make_shared<LidSpaceCompactionHandler>(controller.getReadySubDB(), docTypeName));
        lidSpaceCompactionHandlers.push_back(std::make_shared<LidSpaceCompactionHandler>(controller.getRemSubDB(), docTypeName));
        lidSpaceCompactionHandlers.push_back(std::make_shared<LidSpaceCompactionHandler>(controller.getNotReadySubDB(), docTypeName));
        injectLidSpaceCompactionJobs(controller, config, bucketExecutor, std::move(lidSpaceCompactionHandlers),
                                     opStorer, jobTrackers.getLidSpaceCompact(), diskMemUsageNotifier,
                                     clusterStateChangedNotifier, calc, bucketSpace);
    }

    injectBucketMoveJob(controller, config, bucketExecutor, bucketCreateNotifier, docTypeName, bucketSpace,
                        moveHandler, bucketModifiedHandler, clusterStateChangedNotifier, bucketStateChangedNotifier,
                        calc, jobTrackers, diskMemUsageNotifier);

    controller.registerJobInMasterThread(
            std::make_unique<SampleAttributeUsageJob>(std::move(readyAttributeManager),
                                                      std::move(notReadyAttributeManager),
                                                      attributeUsageFilter, docTypeName,
                                                      config.getAttributeUsageSampleInterval()));
}

} // namespace proton