diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2016-09-20 13:54:03 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2016-09-20 13:54:03 +0000 |
commit | 36ea7aa48be3dc3972ec82b228e13dfc0134f663 (patch) | |
tree | 2e1126e216b4cd27a1eb3f2abb90c4dbf41d39f7 /filedistribution | |
parent | 676e20834c8cfa53dda25a851ce4fb4f3171ac6a (diff) |
Do not stop the eventHandler thread until all events has been handled.
Diffstat (limited to 'filedistribution')
3 files changed, 30 insertions, 10 deletions
diff --git a/filedistribution/src/apps/filedistributor/filedistributor.cpp b/filedistribution/src/apps/filedistributor/filedistributor.cpp index 4d81970f75c..daef2099823 100644 --- a/filedistribution/src/apps/filedistributor/filedistributor.cpp +++ b/filedistribution/src/apps/filedistributor/filedistributor.cpp @@ -110,6 +110,10 @@ class FileDistributor : public config::IFetcherCallback<ZookeepersConfig>, _downloader->close(); _downloaderEventLoopThread->join(); + if ( !_downloader->drained() ) { + LOG(error, "The filedownloader did not drain fully. We will just exit quickly and let a restart repair it for us."); + std::quick_exit(67); + } } }; diff --git a/filedistribution/src/vespa/filedistribution/distributor/filedownloader.cpp b/filedistribution/src/vespa/filedistribution/distributor/filedownloader.cpp index 4ac8bd650ca..ba0dbf8e20d 100644 --- a/filedistribution/src/vespa/filedistribution/distributor/filedownloader.cpp +++ b/filedistribution/src/vespa/filedistribution/distributor/filedownloader.cpp @@ -232,18 +232,29 @@ FileDownloader::FileDownloader(const std::shared_ptr<FileDistributionTracker>& t } -FileDownloader::~FileDownloader() { +void +FileDownloader::drain() { EventHandler eventHandler(this); size_t cnt = 0; + size_t waitCount = 0; do { - LOG(debug, "destructor waiting for %zu SRD alerts", _outstanding_SRD_requests); + LOG(debug, "destructor waiting for %zu SRD alerts", _outstanding_SRD_requests.load()); while (_session.wait_for_alert(libtorrent::milliseconds(20))) { std::unique_ptr<libtorrent::alert> alert = _session.pop_alert(); eventHandler.handle(std::move(alert)); ++cnt; } - } while (_outstanding_SRD_requests > 0); - LOG(debug, "handled %zu alerts in destructor", cnt); + waitCount++; + } while (!drained() && (waitCount < 1000)); + LOG(debug, "handled %zu alerts during draining.", cnt); + if (!drained()) { + LOG(error, "handled %zu alerts during draining. But there are still %zu left.", cnt, _outstanding_SRD_requests.load()); + LOG(error, "We have been waiting for stuff that did not happen."); + } +} + +FileDownloader::~FileDownloader() { + assert(drained()); } void @@ -305,6 +316,7 @@ FileDownloader::hasTorrent(const std::string& fileReference) const { void FileDownloader::addTorrent(const std::string& fileReference, const Buffer& buffer) { + if (closed()) { return; } LockGuard guard(_modifyTorrentsDownloadingMutex); boost::optional<ResumeDataBuffer> resumeData = getResumeData(fileReference); @@ -314,8 +326,7 @@ FileDownloader::addTorrent(const std::string& fileReference, const Buffer& buffe libtorrent::lazy_entry entry; #pragma GCC diagnostic ignored "-Wdeprecated-declarations" - libtorrent::lazy_bdecode(&*buffer.begin(), &*buffer.end(), - entry); //out + libtorrent::lazy_bdecode(&*buffer.begin(), &*buffer.end(), entry); //out #pragma GCC diagnostic pop libtorrent::add_torrent_params torrentParams; @@ -325,8 +336,9 @@ FileDownloader::addTorrent(const std::string& fileReference, const Buffer& buffe torrentParams.auto_managed = false; torrentParams.paused = false; - if (resumeData) + if (resumeData) { torrentParams.resume_data = *resumeData; //vector will be swapped + } libtorrent::torrent_handle torrentHandle = _session.add_torrent(torrentParams); @@ -354,6 +366,7 @@ FileDownloader::deleteTorrentData(const libtorrent::torrent_handle& torrent, Loc void FileDownloader::removeAllTorrentsBut(const std::set<std::string> & filesToRetain) { + if (closed()) { return; } LockGuard guard(_modifyTorrentsDownloadingMutex); std::set<std::string> currentFiles; @@ -380,7 +393,7 @@ void FileDownloader::runEventLoop() { EventHandler eventHandler(this); while ( ! closed() ) { try { - if (_session.wait_for_alert(libtorrent::milliseconds(100))) { + while (_session.wait_for_alert(libtorrent::milliseconds(100))) { std::unique_ptr<libtorrent::alert> alert = _session.pop_alert(); eventHandler.handle(std::move(alert)); } @@ -388,6 +401,7 @@ void FileDownloader::runEventLoop() { LOG(info, "Connection loss in downloader event loop, resuming. %s", e.what()); } } + drain(); } bool diff --git a/filedistribution/src/vespa/filedistribution/distributor/filedownloader.h b/filedistribution/src/vespa/filedistribution/distributor/filedownloader.h index e233dc46ecb..248a906ccb5 100644 --- a/filedistribution/src/vespa/filedistribution/distributor/filedownloader.h +++ b/filedistribution/src/vespa/filedistribution/distributor/filedownloader.h @@ -24,7 +24,7 @@ class FileDownloader ~LogSessionDeconstructed(); }; - size_t _outstanding_SRD_requests; + std::atomic<size_t> _outstanding_SRD_requests; std::shared_ptr<FileDistributionTracker> _tracker; std::mutex _modifyTorrentsDownloadingMutex; @@ -43,6 +43,8 @@ class FileDownloader void deleteTorrentData(const libtorrent::torrent_handle& torrent, LockGuard&); void listen(); + bool closed() const; + void drain(); public: // accounting of save-resume-data requests: void didRequestSRD() { ++_outstanding_SRD_requests; } @@ -69,7 +71,7 @@ public: void setMaxDownloadSpeed(double MBPerSec); void setMaxUploadSpeed(double MBPerSec); void close(); - bool closed() const; + bool drained() const { return _outstanding_SRD_requests == 0; } const std::string _hostName; const int _port; |