diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-02-01 21:04:15 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2021-02-01 21:04:15 +0000 |
commit | 81a749521399c37483df008c308b4dfafa23bc84 (patch) | |
tree | 18db5d6b17b1222d54da9672d0864ae015fd81e2 /storage | |
parent | fdf25819b9c89ccebbe2f1dea307bbb515ec639d (diff) |
Disconnect upcalls early during shutdown in order to let task drain out.
Diffstat (limited to 'storage')
-rw-r--r-- | storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp | 26 | ||||
-rw-r--r-- | storage/src/vespa/storage/persistence/messages.cpp | 7 |
2 files changed, 27 insertions, 6 deletions
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index 42f9e876eee..fb8e3854509 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -820,6 +820,13 @@ FileStorManager::sendUp(const std::shared_ptr<api::StorageMessage>& msg) void FileStorManager::onClose() { LOG(debug, "Start closing"); + std::unique_ptr<vespalib::IDestructorCallback> toDestruct; + { + std::lock_guard guard(_executeLock); + toDestruct = std::move(_bucketExecutorRegistration); + } + toDestruct.reset(); + _resource_usage_listener_registration.reset(); // Avoid getting config during shutdown _configFetcher.close(); LOG(debug, "Closed _configFetcher."); @@ -985,18 +992,18 @@ void FileStorManager::initialize_bucket_databases_from_provider() { class FileStorManager::TrackExecutedTasks : public vespalib::Executor::Task { public: - TrackExecutedTasks(FileStorManager & manager); + TrackExecutedTasks(std::lock_guard<std::mutex> & guard, FileStorManager & manager); void run() override; private: FileStorManager & _manager; size_t _serialNum; }; -FileStorManager::TrackExecutedTasks::TrackExecutedTasks(FileStorManager & manager) +FileStorManager::TrackExecutedTasks::TrackExecutedTasks(std::lock_guard<std::mutex> & guard, FileStorManager & manager) : _manager(manager), _serialNum(0) { - std::lock_guard guard(_manager._executeLock); + (void) guard; _serialNum = _manager._executeCount++; _manager._tasksInExecute.insert(_serialNum); } @@ -1015,8 +1022,16 @@ FileStorManager::execute(const spi::Bucket &bucket, std::unique_ptr<spi::BucketT StorBucketDatabase::WrappedEntry entry(_component.getBucketDatabase(bucket.getBucketSpace()).get( bucket.getBucketId(), "FileStorManager::execute")); if (entry.exist()) { - auto trackBuckets = std::make_unique<TrackExecutedTasks>(*this); - _filestorHandler->schedule(std::make_shared<RunTaskCommand>(bucket, std::move(trackBuckets), std::move(task))); + std::unique_ptr<TrackExecutedTasks> trackTasks; + { + std::lock_guard guard(_executeLock); + if (_bucketExecutorRegistration) { + trackTasks = std::make_unique<TrackExecutedTasks>(guard, *this); + } + } + if (trackTasks) { + _filestorHandler->schedule(std::make_shared<RunTaskCommand>(bucket, std::move(trackTasks), std::move(task))); + } } return task; } @@ -1036,6 +1051,7 @@ areTasksCompleteUntil(const vespalib::hash_set<size_t> &inFlight, size_t limit) void FileStorManager::sync() { std::unique_lock guard(_executeLock); + if ( ! _bucketExecutorRegistration) return; _notifyAfterExecute = true; _syncCond.wait(guard, [this, limit=_executeCount]() { return areTasksCompleteUntil(_tasksInExecute, limit); diff --git a/storage/src/vespa/storage/persistence/messages.cpp b/storage/src/vespa/storage/persistence/messages.cpp index 14f41850a4f..6b1e0e50c65 100644 --- a/storage/src/vespa/storage/persistence/messages.cpp +++ b/storage/src/vespa/storage/persistence/messages.cpp @@ -192,7 +192,11 @@ RunTaskCommand::RunTaskCommand(const spi::Bucket &bucket, _bucket(bucket) { } -RunTaskCommand::~RunTaskCommand() = default; +RunTaskCommand::~RunTaskCommand() { + if (_afterRun) { + _afterRun->run(); + } +} void RunTaskCommand::run(const spi::Bucket & bucket, std::shared_ptr<vespalib::IDestructorCallback> onComplete) @@ -202,6 +206,7 @@ RunTaskCommand::run(const spi::Bucket & bucket, std::shared_ptr<vespalib::IDestr } if (_afterRun) { _afterRun->run(); + _afterRun.reset(); } } |