aboutsummaryrefslogtreecommitdiffstats
path: root/filedistribution/src/apps/filedistributor/filedistributor.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'filedistribution/src/apps/filedistributor/filedistributor.cpp')
-rw-r--r--filedistribution/src/apps/filedistributor/filedistributor.cpp105
1 files changed, 75 insertions, 30 deletions
diff --git a/filedistribution/src/apps/filedistributor/filedistributor.cpp b/filedistribution/src/apps/filedistributor/filedistributor.cpp
index 0c89fb08b6b..62def74ee7a 100644
--- a/filedistribution/src/apps/filedistributor/filedistributor.cpp
+++ b/filedistribution/src/apps/filedistributor/filedistributor.cpp
@@ -5,6 +5,11 @@
#include <cstdlib>
#include <boost/program_options.hpp>
+#include <boost/lambda/bind.hpp>
+#include <boost/lambda/lambda.hpp>
+#include <boost/thread/thread.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/date_time/posix_time/posix_time_types.hpp>
#include <boost/exception/diagnostic_information.hpp>
#include <boost/scope_exit.hpp>
@@ -33,7 +38,7 @@ const char* programName = "filedistributor";
#include <vespa/log/log.h>
LOG_SETUP(programName);
-using namespace std::literals;
+namespace ll = boost::lambda;
using namespace filedistribution;
using cloud::config::ZookeepersConfig;
@@ -48,20 +53,21 @@ class FileDistributor : public config::IFetcherCallback<ZookeepersConfig>,
class Components {
ComponentsDeleter _componentsDeleter;
public:
- const std::shared_ptr<ZKFacade> _zk;
- const std::shared_ptr<FileDistributionModelImpl> _model;
- const std::shared_ptr<FileDistributorTrackerImpl> _tracker;
- const std::shared_ptr<FileDownloader> _downloader;
- const FileDownloaderManager::SP _manager;
- const FileDistributorRPC::SP _rpcHandler;
- const std::shared_ptr<StateServerImpl> _stateServer;
+ const boost::shared_ptr<ZKFacade> _zk;
+ const boost::shared_ptr<FileDistributionModelImpl> _model;
+ const boost::shared_ptr<FileDistributorTrackerImpl> _tracker;
+ const boost::shared_ptr<FileDownloader> _downloader;
+ const boost::shared_ptr<FileDownloaderManager> _manager;
+ const boost::shared_ptr<FileDistributorRPC> _rpcHandler;
+ const boost::shared_ptr<StateServerImpl> _stateServer;
private:
- std::thread _downloaderEventLoopThread;
+ boost::thread _downloaderEventLoopThread;
config::ConfigFetcher _configFetcher;
+
template <class T>
- typename std::shared_ptr<T> track(T* component) {
+ typename boost::shared_ptr<T> track(T* component) {
return _componentsDeleter.track(component);
}
@@ -69,24 +75,29 @@ class FileDistributor : public config::IFetcherCallback<ZookeepersConfig>,
Components(const Components &) = delete;
Components & operator = (const Components &) = delete;
- Components(const config::ConfigUri & configUri,
+ Components(const boost::shared_ptr<ExceptionRethrower>& exceptionRethrower,
+ const config::ConfigUri & configUri,
const ZookeepersConfig& zooKeepersConfig,
const FiledistributorConfig& fileDistributorConfig,
const FiledistributorrpcConfig& rpcConfig)
- :_zk(track(new ZKFacade(zooKeepersConfig.zookeeperserverlist))),
+ :_zk(track(new ZKFacade(zooKeepersConfig.zookeeperserverlist, exceptionRethrower))),
_model(track(new FileDistributionModelImpl(
fileDistributorConfig.hostname,
fileDistributorConfig.torrentport,
- _zk))),
- _tracker(track(new FileDistributorTrackerImpl(_model))),
- _downloader(track(new FileDownloader(_tracker,
- fileDistributorConfig.hostname,
- fileDistributorConfig.torrentport,
- boost::filesystem::path(fileDistributorConfig.filedbpath)))),
+ _zk,
+ exceptionRethrower))),
+ _tracker(track(new FileDistributorTrackerImpl(_model, exceptionRethrower))),
+ _downloader(track(new FileDownloader(
+ _tracker,
+ fileDistributorConfig.hostname,
+ fileDistributorConfig.torrentport,
+ boost::filesystem::path(fileDistributorConfig.filedbpath),
+ exceptionRethrower))),
_manager(track(new FileDownloaderManager(_downloader, _model))),
_rpcHandler(track(new FileDistributorRPC(rpcConfig.connectionspec, _manager))),
_stateServer(track(new StateServerImpl(fileDistributorConfig.stateport))),
- _downloaderEventLoopThread([downloader=_downloader] () { downloader->runEventLoop(); }),
+ _downloaderEventLoopThread(
+ ll::bind(&FileDownloader::runEventLoop, _downloader.get())),
_configFetcher(configUri.getContext())
{
@@ -109,20 +120,21 @@ class FileDistributor : public config::IFetcherCallback<ZookeepersConfig>,
//Do not waste time retrying zookeeper operations when going down.
_zk->disableRetries();
- _downloader->close();
+ _downloaderEventLoopThread.interrupt();
_downloaderEventLoopThread.join();
}
};
- typedef std::lock_guard<std::mutex> LockGuard;
- std::mutex _configMutex;
+ typedef boost::lock_guard<boost::mutex> LockGuard;
+ boost::mutex _configMutex;
bool _completeReconfigurationNeeded;
std::unique_ptr<ZookeepersConfig> _zooKeepersConfig;
std::unique_ptr<FiledistributorConfig> _fileDistributorConfig;
std::unique_ptr<FiledistributorrpcConfig> _rpcConfig;
+ boost::shared_ptr<ExceptionRethrower> _exceptionRethrower;
std::unique_ptr<Components> _components;
public:
FileDistributor(const FileDistributor &) = delete;
@@ -133,6 +145,7 @@ public:
_zooKeepersConfig(),
_fileDistributorConfig(),
_rpcConfig(),
+ _exceptionRethrower(),
_components()
{ }
@@ -174,14 +187,35 @@ public:
void run(const config::ConfigUri & configUri) {
while (!askedToShutDown()) {
clearReinitializeFlag();
+ _exceptionRethrower.reset(new ExceptionRethrower());
runImpl(configUri);
+
+ if (_exceptionRethrower->exceptionStored())
+ _exceptionRethrower->rethrow();
}
}
- void createComponents(const config::ConfigUri & configUri) {
+ static void ensureExceptionsStored(const boost::shared_ptr<ExceptionRethrower>& exceptionRethrower) {
+ //TODO: this is somewhat hackish, refactor to eliminate this later.
+ LOG(debug, "Waiting for shutdown");
+ for (int i=0;
+ i<50 && !exceptionRethrower.unique();
+ ++i) {
+ boost::this_thread::sleep(boost::posix_time::milliseconds(100));
+ }
+ LOG(debug, "Done waiting for shutdown");
+
+ if (!exceptionRethrower.unique()) {
+ EV_STOPPING(programName, "Forced termination");
+ kill(getpid(), SIGKILL);
+ }
+ }
+
+ void createComponents(const boost::shared_ptr<ExceptionRethrower>& exceptionRethrower, const config::ConfigUri & configUri) {
LockGuard guard(_configMutex);
_components.reset(
- new Components(configUri,
+ new Components(exceptionRethrower,
+ configUri,
*_zooKeepersConfig,
*_fileDistributorConfig,
*_rpcConfig));
@@ -204,8 +238,17 @@ public:
downloader.setMaxUploadSpeed(config.maxuploadspeed);
}
+ //avoid warning due to scope exit macro
+#pragma GCC diagnostic ignored "-Wshadow"
void runImpl(const config::ConfigUri & configUri) {
- createComponents(configUri);
+
+ BOOST_SCOPE_EXIT((&_components)(&_exceptionRethrower)) {
+ _components.reset();
+ //Ensures that any exception stored during destruction will be available when returning.
+ ensureExceptionsStored(_exceptionRethrower);
+ } BOOST_SCOPE_EXIT_END
+
+ createComponents(_exceptionRethrower, configUri);
// We do not want back to back reinitializing as it gives zero time for serving
// some torrents.
@@ -213,15 +256,17 @@ public:
while (!askedToShutDown() &&
(postPoneAskedToReinitializedSecs > 0 || !askedToReinitialize()) &&
- !completeReconfigurationNeeded())
- {
- postPoneAskedToReinitializedSecs--;
- std::this_thread::sleep_for(1s);
+ !completeReconfigurationNeeded() &&
+ !_exceptionRethrower->exceptionStored()) {
+ postPoneAskedToReinitializedSecs--;
+ boost::this_thread::sleep(boost::posix_time::seconds(1));
}
- _components.reset();
}
};
+//TODO: use pop in gcc 4.6
+#pragma GCC diagnostic warning "-Wshadow"
+
class FileDistributorApplication : public FastOS_Application {
const config::ConfigUri _configUri;
public: