From 608fbab3c0622ca380dc519b24da2b581a37d6ae Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Mon, 25 Jan 2021 21:10:02 +0000 Subject: - Kill maintenance jobs prior to doing reconfig. - Start if they are stopped. --- .../src/vespa/searchcore/proton/server/documentdb.cpp | 13 ++++++++++--- .../searchcore/proton/server/maintenancecontroller.cpp | 18 ++++++++++-------- .../searchcore/proton/server/maintenancecontroller.h | 11 +++++------ 3 files changed, 25 insertions(+), 17 deletions(-) (limited to 'searchcore') diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp index 17843a5cd22..9df81e5ceec 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp @@ -411,7 +411,7 @@ DocumentDB::applyConfig(DocumentDBConfig::SP configSnapshot, SerialNum serialNum LOG(error, "Applying config to closed document db"); return; } - _maintenanceController.killJobs(); + ConfigComparisonResult cmpres; Schema::SP oldSchema; int64_t generation = configSnapshot->getGeneration(); @@ -432,6 +432,7 @@ DocumentDB::applyConfig(DocumentDBConfig::SP configSnapshot, SerialNum serialNum cmpres.importedFieldsChanged = true; } const ReconfigParams params(cmpres); + // Save config via config manager if replay is done. bool equalReplayConfig = *DocumentDBConfig::makeReplayConfig(configSnapshot) == @@ -453,6 +454,10 @@ DocumentDB::applyConfig(DocumentDBConfig::SP configSnapshot, SerialNum serialNum _feedView.get()->forceCommit(elidedConfigSave ? serialNum : serialNum - 1, std::make_shared>(std::move(commit_result))); _writeService.sync(); } + if (params.shouldMaintenanceControllerChange()) { + _maintenanceController.killJobs(); + } + if (_state.getState() >= DDBState::State::APPLY_LIVE_CONFIG) { _writeServiceConfig.update(configSnapshot->get_threading_service_config()); } @@ -479,7 +484,9 @@ DocumentDB::applyConfig(DocumentDBConfig::SP configSnapshot, SerialNum serialNum _state.clearDelayedConfig(); } setActiveConfig(configSnapshot, generation); - forwardMaintenanceConfig(); + if (params.shouldMaintenanceControllerChange() || _maintenanceController.getPaused()) { + forwardMaintenanceConfig(); + } _writeFilter.setConfig(configSnapshot->getMaintenanceConfigSP()->getAttributeUsageFilterConfig()); if (_subDBs.getReprocessingRunner().empty()) { _subDBs.pruneRemovedFields(serialNum); @@ -997,7 +1004,7 @@ DocumentDB::forwardMaintenanceConfig() const auto &attributes_config = activeConfig->getAttributesConfig(); auto attribute_config_inspector = std::make_unique(attributes_config); if (!_state.getClosed()) { - if (_maintenanceController.getStarted() && !_maintenanceController.getStopping()) { + if (_maintenanceController.getPaused()) { injectMaintenanceJobs(*maintenanceConfig, std::move(attribute_config_inspector)); } _maintenanceController.newConfig(maintenanceConfig); diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp index 9d148a43964..5cedcaf1c3d 100644 --- a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp @@ -25,7 +25,7 @@ private: MaintenanceJobRunner *_job; public: JobWrapperTask(MaintenanceJobRunner *job) : _job(job) {} - virtual void run() override { _job->run(); } + void run() override { _job->run(); } }; } @@ -42,8 +42,7 @@ MaintenanceController::MaintenanceController(IThreadService &masterThread, _periodicTimer(), _config(), _frozenBuckets(masterThread), - _started(false), - _stopping(false), + _state(State::INITIALIZING), _docTypeName(docTypeName), _jobs(), _jobsLock() @@ -83,6 +82,9 @@ MaintenanceController::registerJob(Executor & executor, IMaintenanceJob::UP job) void MaintenanceController::killJobs() { + if (_state == State::STARTED) { + _state = State::PAUSED; + } // Called by master write thread assert(_masterThread.isCurrentThread()); LOG(debug, "killJobs(): threadId=%zu", (size_t)FastOS_Thread::GetCurrentThreadId()); @@ -116,7 +118,7 @@ void MaintenanceController::stop() { assert(!_masterThread.isCurrentThread()); - _masterThread.execute(makeLambdaTask([this]() { _stopping = true; killJobs(); })); + _masterThread.execute(makeLambdaTask([this]() { _state = State::STOPPING; killJobs(); })); _masterThread.sync(); // Wait for killJobs() _masterThread.sync(); // Wait for already scheduled maintenance jobs and performHoldJobs } @@ -134,9 +136,9 @@ void MaintenanceController::start(const DocumentDBMaintenanceConfig::SP &config) { // Called by master write thread - assert(!_started); + assert(_state == State::INITIALIZING); _config = config; - _started = true; + _state = State::STARTED; restart(); } @@ -145,7 +147,7 @@ void MaintenanceController::restart() { // Called by master write thread - if (!_started || _stopping || !_readySubDB.valid()) { + if (!getStarted() || getStopping() || !_readySubDB.valid()) { return; } _periodicTimer = std::make_unique(); @@ -207,7 +209,7 @@ MaintenanceController::syncSubDBs(const MaintenanceDocumentSubDB &readySubDB, _readySubDB = readySubDB; _remSubDB = remSubDB; _notReadySubDB = notReadySubDB; - if (!oldValid && _started) { + if (!oldValid && getStarted()) { restart(); } } diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h index bbaeb176a17..3c8510e6c66 100644 --- a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h +++ b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h @@ -34,6 +34,7 @@ public: using DocumentDBMaintenanceConfigSP = std::shared_ptr; using JobList = std::vector>; using UP = std::unique_ptr; + enum class State {INITIALIZING, STARTED, PAUSED, STOPPING}; MaintenanceController(IThreadService &masterThread, vespalib::SyncableThreadExecutor & defaultExecutor, const DocTypeName &docTypeName); @@ -63,8 +64,9 @@ public: operator const IFrozenBucketHandler &() const { return _frozenBuckets; } operator IFrozenBucketHandler &() { return _frozenBuckets; } - bool getStarted() const { return _started; } - bool getStopping() const { return _stopping; } + bool getStarted() const { return _state >= State::STARTED; } + bool getStopping() const { return _state == State::STOPPING; } + bool getPaused() const { return _state == State::PAUSED; } const MaintenanceDocumentSubDB & getReadySubDB() const { return _readySubDB; } const MaintenanceDocumentSubDB & getRemSubDB() const { return _remSubDB; } @@ -82,8 +84,7 @@ private: std::unique_ptr _periodicTimer; DocumentDBMaintenanceConfigSP _config; FrozenBuckets _frozenBuckets; - bool _started; - bool _stopping; + State _state; const DocTypeName &_docTypeName; JobList _jobs; mutable Mutex _jobsLock; @@ -95,6 +96,4 @@ private: void registerJob(vespalib::Executor & executor, IMaintenanceJob::UP job); }; - } // namespace proton - -- cgit v1.2.3