summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-01-25 21:10:02 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2021-01-25 21:10:02 +0000
commit608fbab3c0622ca380dc519b24da2b581a37d6ae (patch)
tree1dd7f7be4f09fbd61ae1aaeb243ee8bbec0c85f1 /searchcore
parent5410d33fd8146623e2b380dcfe55530b232df336 (diff)
- Kill maintenance jobs prior to doing reconfig.
- Start if they are stopped.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentdb.cpp13
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp18
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h11
3 files changed, 25 insertions, 17 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
index 17843a5cd22..9df81e5ceec 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
@@ -411,7 +411,7 @@ DocumentDB::applyConfig(DocumentDBConfig::SP configSnapshot, SerialNum serialNum
LOG(error, "Applying config to closed document db");
return;
}
- _maintenanceController.killJobs();
+
ConfigComparisonResult cmpres;
Schema::SP oldSchema;
int64_t generation = configSnapshot->getGeneration();
@@ -432,6 +432,7 @@ DocumentDB::applyConfig(DocumentDBConfig::SP configSnapshot, SerialNum serialNum
cmpres.importedFieldsChanged = true;
}
const ReconfigParams params(cmpres);
+
// Save config via config manager if replay is done.
bool equalReplayConfig =
*DocumentDBConfig::makeReplayConfig(configSnapshot) ==
@@ -453,6 +454,10 @@ DocumentDB::applyConfig(DocumentDBConfig::SP configSnapshot, SerialNum serialNum
_feedView.get()->forceCommit(elidedConfigSave ? serialNum : serialNum - 1, std::make_shared<vespalib::KeepAlive<FeedHandler::CommitResult>>(std::move(commit_result)));
_writeService.sync();
}
+ if (params.shouldMaintenanceControllerChange()) {
+ _maintenanceController.killJobs();
+ }
+
if (_state.getState() >= DDBState::State::APPLY_LIVE_CONFIG) {
_writeServiceConfig.update(configSnapshot->get_threading_service_config());
}
@@ -479,7 +484,9 @@ DocumentDB::applyConfig(DocumentDBConfig::SP configSnapshot, SerialNum serialNum
_state.clearDelayedConfig();
}
setActiveConfig(configSnapshot, generation);
- forwardMaintenanceConfig();
+ if (params.shouldMaintenanceControllerChange() || _maintenanceController.getPaused()) {
+ forwardMaintenanceConfig();
+ }
_writeFilter.setConfig(configSnapshot->getMaintenanceConfigSP()->getAttributeUsageFilterConfig());
if (_subDBs.getReprocessingRunner().empty()) {
_subDBs.pruneRemovedFields(serialNum);
@@ -997,7 +1004,7 @@ DocumentDB::forwardMaintenanceConfig()
const auto &attributes_config = activeConfig->getAttributesConfig();
auto attribute_config_inspector = std::make_unique<AttributeConfigInspector>(attributes_config);
if (!_state.getClosed()) {
- if (_maintenanceController.getStarted() && !_maintenanceController.getStopping()) {
+ if (_maintenanceController.getPaused()) {
injectMaintenanceJobs(*maintenanceConfig, std::move(attribute_config_inspector));
}
_maintenanceController.newConfig(maintenanceConfig);
diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp
index 9d148a43964..5cedcaf1c3d 100644
--- a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp
@@ -25,7 +25,7 @@ private:
MaintenanceJobRunner *_job;
public:
JobWrapperTask(MaintenanceJobRunner *job) : _job(job) {}
- virtual void run() override { _job->run(); }
+ void run() override { _job->run(); }
};
}
@@ -42,8 +42,7 @@ MaintenanceController::MaintenanceController(IThreadService &masterThread,
_periodicTimer(),
_config(),
_frozenBuckets(masterThread),
- _started(false),
- _stopping(false),
+ _state(State::INITIALIZING),
_docTypeName(docTypeName),
_jobs(),
_jobsLock()
@@ -83,6 +82,9 @@ MaintenanceController::registerJob(Executor & executor, IMaintenanceJob::UP job)
void
MaintenanceController::killJobs()
{
+ if (_state == State::STARTED) {
+ _state = State::PAUSED;
+ }
// Called by master write thread
assert(_masterThread.isCurrentThread());
LOG(debug, "killJobs(): threadId=%zu", (size_t)FastOS_Thread::GetCurrentThreadId());
@@ -116,7 +118,7 @@ void
MaintenanceController::stop()
{
assert(!_masterThread.isCurrentThread());
- _masterThread.execute(makeLambdaTask([this]() { _stopping = true; killJobs(); }));
+ _masterThread.execute(makeLambdaTask([this]() { _state = State::STOPPING; killJobs(); }));
_masterThread.sync(); // Wait for killJobs()
_masterThread.sync(); // Wait for already scheduled maintenance jobs and performHoldJobs
}
@@ -134,9 +136,9 @@ void
MaintenanceController::start(const DocumentDBMaintenanceConfig::SP &config)
{
// Called by master write thread
- assert(!_started);
+ assert(_state == State::INITIALIZING);
_config = config;
- _started = true;
+ _state = State::STARTED;
restart();
}
@@ -145,7 +147,7 @@ void
MaintenanceController::restart()
{
// Called by master write thread
- if (!_started || _stopping || !_readySubDB.valid()) {
+ if (!getStarted() || getStopping() || !_readySubDB.valid()) {
return;
}
_periodicTimer = std::make_unique<vespalib::ScheduledExecutor>();
@@ -207,7 +209,7 @@ MaintenanceController::syncSubDBs(const MaintenanceDocumentSubDB &readySubDB,
_readySubDB = readySubDB;
_remSubDB = remSubDB;
_notReadySubDB = notReadySubDB;
- if (!oldValid && _started) {
+ if (!oldValid && getStarted()) {
restart();
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h
index bbaeb176a17..3c8510e6c66 100644
--- a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h
+++ b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h
@@ -34,6 +34,7 @@ public:
using DocumentDBMaintenanceConfigSP = std::shared_ptr<DocumentDBMaintenanceConfig>;
using JobList = std::vector<std::shared_ptr<MaintenanceJobRunner>>;
using UP = std::unique_ptr<MaintenanceController>;
+ enum class State {INITIALIZING, STARTED, PAUSED, STOPPING};
MaintenanceController(IThreadService &masterThread, vespalib::SyncableThreadExecutor & defaultExecutor, const DocTypeName &docTypeName);
@@ -63,8 +64,9 @@ public:
operator const IFrozenBucketHandler &() const { return _frozenBuckets; }
operator IFrozenBucketHandler &() { return _frozenBuckets; }
- bool getStarted() const { return _started; }
- bool getStopping() const { return _stopping; }
+ bool getStarted() const { return _state >= State::STARTED; }
+ bool getStopping() const { return _state == State::STOPPING; }
+ bool getPaused() const { return _state == State::PAUSED; }
const MaintenanceDocumentSubDB & getReadySubDB() const { return _readySubDB; }
const MaintenanceDocumentSubDB & getRemSubDB() const { return _remSubDB; }
@@ -82,8 +84,7 @@ private:
std::unique_ptr<vespalib::ScheduledExecutor> _periodicTimer;
DocumentDBMaintenanceConfigSP _config;
FrozenBuckets _frozenBuckets;
- bool _started;
- bool _stopping;
+ State _state;
const DocTypeName &_docTypeName;
JobList _jobs;
mutable Mutex _jobsLock;
@@ -95,6 +96,4 @@ private:
void registerJob(vespalib::Executor & executor, IMaintenanceJob::UP job);
};
-
} // namespace proton
-