1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "bucketmovejob.h"
#include "heart_beat_job.h"
#include "job_tracked_maintenance_job.h"
#include "lid_space_compaction_job.h"
#include "lid_space_compaction_handler.h"
#include "maintenance_jobs_injector.h"
#include "prune_session_cache_job.h"
#include "pruneremoveddocumentsjob.h"
#include "sample_attribute_usage_job.h"
using vespalib::system_clock;
namespace proton {
namespace {
IMaintenanceJob::UP
trackJob(std::shared_ptr<IJobTracker> tracker, std::shared_ptr<IMaintenanceJob> job)
{
return std::make_unique<JobTrackedMaintenanceJob>(std::move(tracker), std::move(job));
}
void
injectLidSpaceCompactionJobs(MaintenanceController &controller,
const DocumentDBMaintenanceConfig &config,
storage::spi::BucketExecutor & bucketExecutor,
ILidSpaceCompactionHandler::Vector lscHandlers,
IOperationStorer &opStorer,
std::shared_ptr<IJobTracker> tracker,
IDiskMemUsageNotifier &diskMemUsageNotifier,
IClusterStateChangedNotifier &clusterStateChangedNotifier,
const std::shared_ptr<IBucketStateCalculator> &calc,
document::BucketSpace bucketSpace)
{
for (auto &lidHandler : lscHandlers) {
auto job = lidspace::CompactionJob::create(config.getLidSpaceCompactionConfig(), controller.retainDB(),
std::move(lidHandler), opStorer, controller.masterThread(),
bucketExecutor, diskMemUsageNotifier,config.getBlockableJobConfig(),
clusterStateChangedNotifier, calc && calc->nodeRetired(), bucketSpace);
controller.registerJobInMasterThread(trackJob(tracker, std::move(job)));
}
}
void
injectBucketMoveJob(MaintenanceController &controller,
const DocumentDBMaintenanceConfig &config,
storage::spi::BucketExecutor & bucketExecutor,
bucketdb::IBucketCreateNotifier &bucketCreateNotifier,
const vespalib::string &docTypeName,
document::BucketSpace bucketSpace,
IDocumentMoveHandler &moveHandler,
IBucketModifiedHandler &bucketModifiedHandler,
IClusterStateChangedNotifier &clusterStateChangedNotifier,
IBucketStateChangedNotifier &bucketStateChangedNotifier,
std::shared_ptr<IBucketStateCalculator> calc,
DocumentDBJobTrackers &jobTrackers,
IDiskMemUsageNotifier &diskMemUsageNotifier)
{
auto bmj = BucketMoveJob::create(std::move(calc), controller.retainDB(), moveHandler, bucketModifiedHandler, controller.masterThread(),
bucketExecutor, controller.getReadySubDB(), controller.getNotReadySubDB(),
bucketCreateNotifier, clusterStateChangedNotifier, bucketStateChangedNotifier,
diskMemUsageNotifier, config.getBlockableJobConfig(), docTypeName, bucketSpace);
controller.registerJobInMasterThread(trackJob(jobTrackers.getBucketMove(), std::move(bmj)));
}
}
void
MaintenanceJobsInjector::injectJobs(MaintenanceController &controller,
const DocumentDBMaintenanceConfig &config,
storage::spi::BucketExecutor & bucketExecutor,
IHeartBeatHandler &hbHandler,
matching::ISessionCachePruner &scPruner,
IOperationStorer &opStorer,
bucketdb::IBucketCreateNotifier &bucketCreateNotifier,
document::BucketSpace bucketSpace,
IPruneRemovedDocumentsHandler &prdHandler,
IDocumentMoveHandler &moveHandler,
IBucketModifiedHandler &bucketModifiedHandler,
IClusterStateChangedNotifier &clusterStateChangedNotifier,
IBucketStateChangedNotifier &bucketStateChangedNotifier,
const std::shared_ptr<IBucketStateCalculator> &calc,
IDiskMemUsageNotifier &diskMemUsageNotifier,
DocumentDBJobTrackers &jobTrackers,
IAttributeManagerSP readyAttributeManager,
IAttributeManagerSP notReadyAttributeManager,
AttributeUsageFilter &attributeUsageFilter)
{
controller.registerJobInMasterThread(std::make_unique<HeartBeatJob>(hbHandler, config.getHeartBeatConfig()));
controller.registerJobInSharedExecutor(
std::make_unique<PruneSessionCacheJob>(scPruner, config.getSessionCachePruneInterval()));
const auto & docTypeName = controller.getDocTypeName().getName();
const MaintenanceDocumentSubDB &mRemSubDB(controller.getRemSubDB());
controller.registerJobInMasterThread(
trackJob(jobTrackers.getRemovedDocumentsPrune(),
PruneRemovedDocumentsJob::create(config.getPruneRemovedDocumentsConfig(), controller.retainDB(),
*mRemSubDB.meta_store(), mRemSubDB.sub_db_id(), bucketSpace,
docTypeName, prdHandler, controller.masterThread(),
bucketExecutor)));
if (!config.getLidSpaceCompactionConfig().isDisabled()) {
ILidSpaceCompactionHandler::Vector lidSpaceCompactionHandlers;
lidSpaceCompactionHandlers.push_back(std::make_shared<LidSpaceCompactionHandler>(controller.getReadySubDB(), docTypeName));
lidSpaceCompactionHandlers.push_back(std::make_shared<LidSpaceCompactionHandler>(controller.getRemSubDB(), docTypeName));
lidSpaceCompactionHandlers.push_back(std::make_shared<LidSpaceCompactionHandler>(controller.getNotReadySubDB(), docTypeName));
injectLidSpaceCompactionJobs(controller, config, bucketExecutor, std::move(lidSpaceCompactionHandlers),
opStorer, jobTrackers.getLidSpaceCompact(), diskMemUsageNotifier,
clusterStateChangedNotifier, calc, bucketSpace);
}
injectBucketMoveJob(controller, config, bucketExecutor, bucketCreateNotifier, docTypeName, bucketSpace,
moveHandler, bucketModifiedHandler, clusterStateChangedNotifier, bucketStateChangedNotifier,
calc, jobTrackers, diskMemUsageNotifier);
controller.registerJobInMasterThread(
std::make_unique<SampleAttributeUsageJob>(std::move(readyAttributeManager),
std::move(notReadyAttributeManager),
attributeUsageFilter, docTypeName,
config.getAttributeUsageSampleInterval()));
}
} // namespace proton
|