summaryrefslogtreecommitdiffstats
path: root/filedistribution
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2016-09-20 13:54:03 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2016-09-20 13:54:03 +0000
commit36ea7aa48be3dc3972ec82b228e13dfc0134f663 (patch)
tree2e1126e216b4cd27a1eb3f2abb90c4dbf41d39f7 /filedistribution
parent676e20834c8cfa53dda25a851ce4fb4f3171ac6a (diff)
Do not stop the eventHandler thread until all events has been handled.
Diffstat (limited to 'filedistribution')
-rw-r--r--filedistribution/src/apps/filedistributor/filedistributor.cpp4
-rw-r--r--filedistribution/src/vespa/filedistribution/distributor/filedownloader.cpp30
-rw-r--r--filedistribution/src/vespa/filedistribution/distributor/filedownloader.h6
3 files changed, 30 insertions, 10 deletions
diff --git a/filedistribution/src/apps/filedistributor/filedistributor.cpp b/filedistribution/src/apps/filedistributor/filedistributor.cpp
index 4d81970f75c..daef2099823 100644
--- a/filedistribution/src/apps/filedistributor/filedistributor.cpp
+++ b/filedistribution/src/apps/filedistributor/filedistributor.cpp
@@ -110,6 +110,10 @@ class FileDistributor : public config::IFetcherCallback<ZookeepersConfig>,
_downloader->close();
_downloaderEventLoopThread->join();
+ if ( !_downloader->drained() ) {
+ LOG(error, "The filedownloader did not drain fully. We will just exit quickly and let a restart repair it for us.");
+ std::quick_exit(67);
+ }
}
};
diff --git a/filedistribution/src/vespa/filedistribution/distributor/filedownloader.cpp b/filedistribution/src/vespa/filedistribution/distributor/filedownloader.cpp
index 4ac8bd650ca..ba0dbf8e20d 100644
--- a/filedistribution/src/vespa/filedistribution/distributor/filedownloader.cpp
+++ b/filedistribution/src/vespa/filedistribution/distributor/filedownloader.cpp
@@ -232,18 +232,29 @@ FileDownloader::FileDownloader(const std::shared_ptr<FileDistributionTracker>& t
}
-FileDownloader::~FileDownloader() {
+void
+FileDownloader::drain() {
EventHandler eventHandler(this);
size_t cnt = 0;
+ size_t waitCount = 0;
do {
- LOG(debug, "destructor waiting for %zu SRD alerts", _outstanding_SRD_requests);
+ LOG(debug, "destructor waiting for %zu SRD alerts", _outstanding_SRD_requests.load());
while (_session.wait_for_alert(libtorrent::milliseconds(20))) {
std::unique_ptr<libtorrent::alert> alert = _session.pop_alert();
eventHandler.handle(std::move(alert));
++cnt;
}
- } while (_outstanding_SRD_requests > 0);
- LOG(debug, "handled %zu alerts in destructor", cnt);
+ waitCount++;
+ } while (!drained() && (waitCount < 1000));
+ LOG(debug, "handled %zu alerts during draining.", cnt);
+ if (!drained()) {
+ LOG(error, "handled %zu alerts during draining. But there are still %zu left.", cnt, _outstanding_SRD_requests.load());
+ LOG(error, "We have been waiting for stuff that did not happen.");
+ }
+}
+
+FileDownloader::~FileDownloader() {
+ assert(drained());
}
void
@@ -305,6 +316,7 @@ FileDownloader::hasTorrent(const std::string& fileReference) const {
void
FileDownloader::addTorrent(const std::string& fileReference, const Buffer& buffer) {
+ if (closed()) { return; }
LockGuard guard(_modifyTorrentsDownloadingMutex);
boost::optional<ResumeDataBuffer> resumeData = getResumeData(fileReference);
@@ -314,8 +326,7 @@ FileDownloader::addTorrent(const std::string& fileReference, const Buffer& buffe
libtorrent::lazy_entry entry;
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
- libtorrent::lazy_bdecode(&*buffer.begin(), &*buffer.end(),
- entry); //out
+ libtorrent::lazy_bdecode(&*buffer.begin(), &*buffer.end(), entry); //out
#pragma GCC diagnostic pop
libtorrent::add_torrent_params torrentParams;
@@ -325,8 +336,9 @@ FileDownloader::addTorrent(const std::string& fileReference, const Buffer& buffe
torrentParams.auto_managed = false;
torrentParams.paused = false;
- if (resumeData)
+ if (resumeData) {
torrentParams.resume_data = *resumeData; //vector will be swapped
+ }
libtorrent::torrent_handle torrentHandle = _session.add_torrent(torrentParams);
@@ -354,6 +366,7 @@ FileDownloader::deleteTorrentData(const libtorrent::torrent_handle& torrent, Loc
void
FileDownloader::removeAllTorrentsBut(const std::set<std::string> & filesToRetain) {
+ if (closed()) { return; }
LockGuard guard(_modifyTorrentsDownloadingMutex);
std::set<std::string> currentFiles;
@@ -380,7 +393,7 @@ void FileDownloader::runEventLoop() {
EventHandler eventHandler(this);
while ( ! closed() ) {
try {
- if (_session.wait_for_alert(libtorrent::milliseconds(100))) {
+ while (_session.wait_for_alert(libtorrent::milliseconds(100))) {
std::unique_ptr<libtorrent::alert> alert = _session.pop_alert();
eventHandler.handle(std::move(alert));
}
@@ -388,6 +401,7 @@ void FileDownloader::runEventLoop() {
LOG(info, "Connection loss in downloader event loop, resuming. %s", e.what());
}
}
+ drain();
}
bool
diff --git a/filedistribution/src/vespa/filedistribution/distributor/filedownloader.h b/filedistribution/src/vespa/filedistribution/distributor/filedownloader.h
index e233dc46ecb..248a906ccb5 100644
--- a/filedistribution/src/vespa/filedistribution/distributor/filedownloader.h
+++ b/filedistribution/src/vespa/filedistribution/distributor/filedownloader.h
@@ -24,7 +24,7 @@ class FileDownloader
~LogSessionDeconstructed();
};
- size_t _outstanding_SRD_requests;
+ std::atomic<size_t> _outstanding_SRD_requests;
std::shared_ptr<FileDistributionTracker> _tracker;
std::mutex _modifyTorrentsDownloadingMutex;
@@ -43,6 +43,8 @@ class FileDownloader
void deleteTorrentData(const libtorrent::torrent_handle& torrent, LockGuard&);
void listen();
+ bool closed() const;
+ void drain();
public:
// accounting of save-resume-data requests:
void didRequestSRD() { ++_outstanding_SRD_requests; }
@@ -69,7 +71,7 @@ public:
void setMaxDownloadSpeed(double MBPerSec);
void setMaxUploadSpeed(double MBPerSec);
void close();
- bool closed() const;
+ bool drained() const { return _outstanding_SRD_requests == 0; }
const std::string _hostName;
const int _port;