summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-02-01 21:04:15 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2021-02-01 21:04:15 +0000
commit81a749521399c37483df008c308b4dfafa23bc84 (patch)
tree18db5d6b17b1222d54da9672d0864ae015fd81e2 /storage
parentfdf25819b9c89ccebbe2f1dea307bbb515ec639d (diff)
Disconnect upcalls early during shutdown in order to let task drain out.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp26
-rw-r--r--storage/src/vespa/storage/persistence/messages.cpp7
2 files changed, 27 insertions, 6 deletions
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index 42f9e876eee..fb8e3854509 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -820,6 +820,13 @@ FileStorManager::sendUp(const std::shared_ptr<api::StorageMessage>& msg)
void FileStorManager::onClose()
{
LOG(debug, "Start closing");
+ std::unique_ptr<vespalib::IDestructorCallback> toDestruct;
+ {
+ std::lock_guard guard(_executeLock);
+ toDestruct = std::move(_bucketExecutorRegistration);
+ }
+ toDestruct.reset();
+ _resource_usage_listener_registration.reset();
// Avoid getting config during shutdown
_configFetcher.close();
LOG(debug, "Closed _configFetcher.");
@@ -985,18 +992,18 @@ void FileStorManager::initialize_bucket_databases_from_provider() {
class FileStorManager::TrackExecutedTasks : public vespalib::Executor::Task {
public:
- TrackExecutedTasks(FileStorManager & manager);
+ TrackExecutedTasks(std::lock_guard<std::mutex> & guard, FileStorManager & manager);
void run() override;
private:
FileStorManager & _manager;
size_t _serialNum;
};
-FileStorManager::TrackExecutedTasks::TrackExecutedTasks(FileStorManager & manager)
+FileStorManager::TrackExecutedTasks::TrackExecutedTasks(std::lock_guard<std::mutex> & guard, FileStorManager & manager)
: _manager(manager),
_serialNum(0)
{
- std::lock_guard guard(_manager._executeLock);
+ (void) guard;
_serialNum = _manager._executeCount++;
_manager._tasksInExecute.insert(_serialNum);
}
@@ -1015,8 +1022,16 @@ FileStorManager::execute(const spi::Bucket &bucket, std::unique_ptr<spi::BucketT
StorBucketDatabase::WrappedEntry entry(_component.getBucketDatabase(bucket.getBucketSpace()).get(
bucket.getBucketId(), "FileStorManager::execute"));
if (entry.exist()) {
- auto trackBuckets = std::make_unique<TrackExecutedTasks>(*this);
- _filestorHandler->schedule(std::make_shared<RunTaskCommand>(bucket, std::move(trackBuckets), std::move(task)));
+ std::unique_ptr<TrackExecutedTasks> trackTasks;
+ {
+ std::lock_guard guard(_executeLock);
+ if (_bucketExecutorRegistration) {
+ trackTasks = std::make_unique<TrackExecutedTasks>(guard, *this);
+ }
+ }
+ if (trackTasks) {
+ _filestorHandler->schedule(std::make_shared<RunTaskCommand>(bucket, std::move(trackTasks), std::move(task)));
+ }
}
return task;
}
@@ -1036,6 +1051,7 @@ areTasksCompleteUntil(const vespalib::hash_set<size_t> &inFlight, size_t limit)
void
FileStorManager::sync() {
std::unique_lock guard(_executeLock);
+ if ( ! _bucketExecutorRegistration) return;
_notifyAfterExecute = true;
_syncCond.wait(guard, [this, limit=_executeCount]() {
return areTasksCompleteUntil(_tasksInExecute, limit);
diff --git a/storage/src/vespa/storage/persistence/messages.cpp b/storage/src/vespa/storage/persistence/messages.cpp
index 14f41850a4f..6b1e0e50c65 100644
--- a/storage/src/vespa/storage/persistence/messages.cpp
+++ b/storage/src/vespa/storage/persistence/messages.cpp
@@ -192,7 +192,11 @@ RunTaskCommand::RunTaskCommand(const spi::Bucket &bucket,
_bucket(bucket)
{ }
-RunTaskCommand::~RunTaskCommand() = default;
+RunTaskCommand::~RunTaskCommand() {
+ if (_afterRun) {
+ _afterRun->run();
+ }
+}
void
RunTaskCommand::run(const spi::Bucket & bucket, std::shared_ptr<vespalib::IDestructorCallback> onComplete)
@@ -202,6 +206,7 @@ RunTaskCommand::run(const spi::Bucket & bucket, std::shared_ptr<vespalib::IDestr
}
if (_afterRun) {
_afterRun->run();
+ _afterRun.reset();
}
}