diff options
Diffstat (limited to 'filedistribution/src/apps/filedistributor/filedistributor.cpp')
-rw-r--r-- | filedistribution/src/apps/filedistributor/filedistributor.cpp | 105 |
1 files changed, 75 insertions, 30 deletions
diff --git a/filedistribution/src/apps/filedistributor/filedistributor.cpp b/filedistribution/src/apps/filedistributor/filedistributor.cpp index 0c89fb08b6b..62def74ee7a 100644 --- a/filedistribution/src/apps/filedistributor/filedistributor.cpp +++ b/filedistribution/src/apps/filedistributor/filedistributor.cpp @@ -5,6 +5,11 @@ #include <cstdlib> #include <boost/program_options.hpp> +#include <boost/lambda/bind.hpp> +#include <boost/lambda/lambda.hpp> +#include <boost/thread/thread.hpp> +#include <boost/thread/mutex.hpp> +#include <boost/date_time/posix_time/posix_time_types.hpp> #include <boost/exception/diagnostic_information.hpp> #include <boost/scope_exit.hpp> @@ -33,7 +38,7 @@ const char* programName = "filedistributor"; #include <vespa/log/log.h> LOG_SETUP(programName); -using namespace std::literals; +namespace ll = boost::lambda; using namespace filedistribution; using cloud::config::ZookeepersConfig; @@ -48,20 +53,21 @@ class FileDistributor : public config::IFetcherCallback<ZookeepersConfig>, class Components { ComponentsDeleter _componentsDeleter; public: - const std::shared_ptr<ZKFacade> _zk; - const std::shared_ptr<FileDistributionModelImpl> _model; - const std::shared_ptr<FileDistributorTrackerImpl> _tracker; - const std::shared_ptr<FileDownloader> _downloader; - const FileDownloaderManager::SP _manager; - const FileDistributorRPC::SP _rpcHandler; - const std::shared_ptr<StateServerImpl> _stateServer; + const boost::shared_ptr<ZKFacade> _zk; + const boost::shared_ptr<FileDistributionModelImpl> _model; + const boost::shared_ptr<FileDistributorTrackerImpl> _tracker; + const boost::shared_ptr<FileDownloader> _downloader; + const boost::shared_ptr<FileDownloaderManager> _manager; + const boost::shared_ptr<FileDistributorRPC> _rpcHandler; + const boost::shared_ptr<StateServerImpl> _stateServer; private: - std::thread _downloaderEventLoopThread; + boost::thread _downloaderEventLoopThread; config::ConfigFetcher _configFetcher; + template <class T> - typename std::shared_ptr<T> track(T* component) { + typename boost::shared_ptr<T> track(T* component) { return _componentsDeleter.track(component); } @@ -69,24 +75,29 @@ class FileDistributor : public config::IFetcherCallback<ZookeepersConfig>, Components(const Components &) = delete; Components & operator = (const Components &) = delete; - Components(const config::ConfigUri & configUri, + Components(const boost::shared_ptr<ExceptionRethrower>& exceptionRethrower, + const config::ConfigUri & configUri, const ZookeepersConfig& zooKeepersConfig, const FiledistributorConfig& fileDistributorConfig, const FiledistributorrpcConfig& rpcConfig) - :_zk(track(new ZKFacade(zooKeepersConfig.zookeeperserverlist))), + :_zk(track(new ZKFacade(zooKeepersConfig.zookeeperserverlist, exceptionRethrower))), _model(track(new FileDistributionModelImpl( fileDistributorConfig.hostname, fileDistributorConfig.torrentport, - _zk))), - _tracker(track(new FileDistributorTrackerImpl(_model))), - _downloader(track(new FileDownloader(_tracker, - fileDistributorConfig.hostname, - fileDistributorConfig.torrentport, - boost::filesystem::path(fileDistributorConfig.filedbpath)))), + _zk, + exceptionRethrower))), + _tracker(track(new FileDistributorTrackerImpl(_model, exceptionRethrower))), + _downloader(track(new FileDownloader( + _tracker, + fileDistributorConfig.hostname, + fileDistributorConfig.torrentport, + boost::filesystem::path(fileDistributorConfig.filedbpath), + exceptionRethrower))), _manager(track(new FileDownloaderManager(_downloader, _model))), _rpcHandler(track(new FileDistributorRPC(rpcConfig.connectionspec, _manager))), _stateServer(track(new StateServerImpl(fileDistributorConfig.stateport))), - _downloaderEventLoopThread([downloader=_downloader] () { downloader->runEventLoop(); }), + _downloaderEventLoopThread( + ll::bind(&FileDownloader::runEventLoop, _downloader.get())), _configFetcher(configUri.getContext()) { @@ -109,20 +120,21 @@ class FileDistributor : public config::IFetcherCallback<ZookeepersConfig>, //Do not waste time retrying zookeeper operations when going down. _zk->disableRetries(); - _downloader->close(); + _downloaderEventLoopThread.interrupt(); _downloaderEventLoopThread.join(); } }; - typedef std::lock_guard<std::mutex> LockGuard; - std::mutex _configMutex; + typedef boost::lock_guard<boost::mutex> LockGuard; + boost::mutex _configMutex; bool _completeReconfigurationNeeded; std::unique_ptr<ZookeepersConfig> _zooKeepersConfig; std::unique_ptr<FiledistributorConfig> _fileDistributorConfig; std::unique_ptr<FiledistributorrpcConfig> _rpcConfig; + boost::shared_ptr<ExceptionRethrower> _exceptionRethrower; std::unique_ptr<Components> _components; public: FileDistributor(const FileDistributor &) = delete; @@ -133,6 +145,7 @@ public: _zooKeepersConfig(), _fileDistributorConfig(), _rpcConfig(), + _exceptionRethrower(), _components() { } @@ -174,14 +187,35 @@ public: void run(const config::ConfigUri & configUri) { while (!askedToShutDown()) { clearReinitializeFlag(); + _exceptionRethrower.reset(new ExceptionRethrower()); runImpl(configUri); + + if (_exceptionRethrower->exceptionStored()) + _exceptionRethrower->rethrow(); } } - void createComponents(const config::ConfigUri & configUri) { + static void ensureExceptionsStored(const boost::shared_ptr<ExceptionRethrower>& exceptionRethrower) { + //TODO: this is somewhat hackish, refactor to eliminate this later. + LOG(debug, "Waiting for shutdown"); + for (int i=0; + i<50 && !exceptionRethrower.unique(); + ++i) { + boost::this_thread::sleep(boost::posix_time::milliseconds(100)); + } + LOG(debug, "Done waiting for shutdown"); + + if (!exceptionRethrower.unique()) { + EV_STOPPING(programName, "Forced termination"); + kill(getpid(), SIGKILL); + } + } + + void createComponents(const boost::shared_ptr<ExceptionRethrower>& exceptionRethrower, const config::ConfigUri & configUri) { LockGuard guard(_configMutex); _components.reset( - new Components(configUri, + new Components(exceptionRethrower, + configUri, *_zooKeepersConfig, *_fileDistributorConfig, *_rpcConfig)); @@ -204,8 +238,17 @@ public: downloader.setMaxUploadSpeed(config.maxuploadspeed); } + //avoid warning due to scope exit macro +#pragma GCC diagnostic ignored "-Wshadow" void runImpl(const config::ConfigUri & configUri) { - createComponents(configUri); + + BOOST_SCOPE_EXIT((&_components)(&_exceptionRethrower)) { + _components.reset(); + //Ensures that any exception stored during destruction will be available when returning. + ensureExceptionsStored(_exceptionRethrower); + } BOOST_SCOPE_EXIT_END + + createComponents(_exceptionRethrower, configUri); // We do not want back to back reinitializing as it gives zero time for serving // some torrents. @@ -213,15 +256,17 @@ public: while (!askedToShutDown() && (postPoneAskedToReinitializedSecs > 0 || !askedToReinitialize()) && - !completeReconfigurationNeeded()) - { - postPoneAskedToReinitializedSecs--; - std::this_thread::sleep_for(1s); + !completeReconfigurationNeeded() && + !_exceptionRethrower->exceptionStored()) { + postPoneAskedToReinitializedSecs--; + boost::this_thread::sleep(boost::posix_time::seconds(1)); } - _components.reset(); } }; +//TODO: use pop in gcc 4.6 +#pragma GCC diagnostic warning "-Wshadow" + class FileDistributorApplication : public FastOS_Application { const config::ConfigUri _configUri; public: |