diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2016-09-15 16:45:03 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2016-09-15 16:45:03 +0200 |
commit | 75e9061a9f1a716d3bd79a3496d07f0ad7256e46 (patch) | |
tree | 07a26b47994dc53e8288c9372c3e8febdade6ad5 /filedistribution | |
parent | bf1333c9d6fe3ebd5f736035857f155131702961 (diff) |
Revert "Balder/remove boost noncopyable 2"
Diffstat (limited to 'filedistribution')
34 files changed, 556 insertions, 330 deletions
diff --git a/filedistribution/src/apps/filedistributor/filedistributor.cpp b/filedistribution/src/apps/filedistributor/filedistributor.cpp index 0c89fb08b6b..62def74ee7a 100644 --- a/filedistribution/src/apps/filedistributor/filedistributor.cpp +++ b/filedistribution/src/apps/filedistributor/filedistributor.cpp @@ -5,6 +5,11 @@ #include <cstdlib> #include <boost/program_options.hpp> +#include <boost/lambda/bind.hpp> +#include <boost/lambda/lambda.hpp> +#include <boost/thread/thread.hpp> +#include <boost/thread/mutex.hpp> +#include <boost/date_time/posix_time/posix_time_types.hpp> #include <boost/exception/diagnostic_information.hpp> #include <boost/scope_exit.hpp> @@ -33,7 +38,7 @@ const char* programName = "filedistributor"; #include <vespa/log/log.h> LOG_SETUP(programName); -using namespace std::literals; +namespace ll = boost::lambda; using namespace filedistribution; using cloud::config::ZookeepersConfig; @@ -48,20 +53,21 @@ class FileDistributor : public config::IFetcherCallback<ZookeepersConfig>, class Components { ComponentsDeleter _componentsDeleter; public: - const std::shared_ptr<ZKFacade> _zk; - const std::shared_ptr<FileDistributionModelImpl> _model; - const std::shared_ptr<FileDistributorTrackerImpl> _tracker; - const std::shared_ptr<FileDownloader> _downloader; - const FileDownloaderManager::SP _manager; - const FileDistributorRPC::SP _rpcHandler; - const std::shared_ptr<StateServerImpl> _stateServer; + const boost::shared_ptr<ZKFacade> _zk; + const boost::shared_ptr<FileDistributionModelImpl> _model; + const boost::shared_ptr<FileDistributorTrackerImpl> _tracker; + const boost::shared_ptr<FileDownloader> _downloader; + const boost::shared_ptr<FileDownloaderManager> _manager; + const boost::shared_ptr<FileDistributorRPC> _rpcHandler; + const boost::shared_ptr<StateServerImpl> _stateServer; private: - std::thread _downloaderEventLoopThread; + boost::thread _downloaderEventLoopThread; config::ConfigFetcher _configFetcher; + template <class T> - typename std::shared_ptr<T> track(T* component) { + typename boost::shared_ptr<T> track(T* component) { return _componentsDeleter.track(component); } @@ -69,24 +75,29 @@ class FileDistributor : public config::IFetcherCallback<ZookeepersConfig>, Components(const Components &) = delete; Components & operator = (const Components &) = delete; - Components(const config::ConfigUri & configUri, + Components(const boost::shared_ptr<ExceptionRethrower>& exceptionRethrower, + const config::ConfigUri & configUri, const ZookeepersConfig& zooKeepersConfig, const FiledistributorConfig& fileDistributorConfig, const FiledistributorrpcConfig& rpcConfig) - :_zk(track(new ZKFacade(zooKeepersConfig.zookeeperserverlist))), + :_zk(track(new ZKFacade(zooKeepersConfig.zookeeperserverlist, exceptionRethrower))), _model(track(new FileDistributionModelImpl( fileDistributorConfig.hostname, fileDistributorConfig.torrentport, - _zk))), - _tracker(track(new FileDistributorTrackerImpl(_model))), - _downloader(track(new FileDownloader(_tracker, - fileDistributorConfig.hostname, - fileDistributorConfig.torrentport, - boost::filesystem::path(fileDistributorConfig.filedbpath)))), + _zk, + exceptionRethrower))), + _tracker(track(new FileDistributorTrackerImpl(_model, exceptionRethrower))), + _downloader(track(new FileDownloader( + _tracker, + fileDistributorConfig.hostname, + fileDistributorConfig.torrentport, + boost::filesystem::path(fileDistributorConfig.filedbpath), + exceptionRethrower))), _manager(track(new FileDownloaderManager(_downloader, _model))), _rpcHandler(track(new FileDistributorRPC(rpcConfig.connectionspec, _manager))), _stateServer(track(new StateServerImpl(fileDistributorConfig.stateport))), - _downloaderEventLoopThread([downloader=_downloader] () { downloader->runEventLoop(); }), + _downloaderEventLoopThread( + ll::bind(&FileDownloader::runEventLoop, _downloader.get())), _configFetcher(configUri.getContext()) { @@ -109,20 +120,21 @@ class FileDistributor : public config::IFetcherCallback<ZookeepersConfig>, //Do not waste time retrying zookeeper operations when going down. _zk->disableRetries(); - _downloader->close(); + _downloaderEventLoopThread.interrupt(); _downloaderEventLoopThread.join(); } }; - typedef std::lock_guard<std::mutex> LockGuard; - std::mutex _configMutex; + typedef boost::lock_guard<boost::mutex> LockGuard; + boost::mutex _configMutex; bool _completeReconfigurationNeeded; std::unique_ptr<ZookeepersConfig> _zooKeepersConfig; std::unique_ptr<FiledistributorConfig> _fileDistributorConfig; std::unique_ptr<FiledistributorrpcConfig> _rpcConfig; + boost::shared_ptr<ExceptionRethrower> _exceptionRethrower; std::unique_ptr<Components> _components; public: FileDistributor(const FileDistributor &) = delete; @@ -133,6 +145,7 @@ public: _zooKeepersConfig(), _fileDistributorConfig(), _rpcConfig(), + _exceptionRethrower(), _components() { } @@ -174,14 +187,35 @@ public: void run(const config::ConfigUri & configUri) { while (!askedToShutDown()) { clearReinitializeFlag(); + _exceptionRethrower.reset(new ExceptionRethrower()); runImpl(configUri); + + if (_exceptionRethrower->exceptionStored()) + _exceptionRethrower->rethrow(); } } - void createComponents(const config::ConfigUri & configUri) { + static void ensureExceptionsStored(const boost::shared_ptr<ExceptionRethrower>& exceptionRethrower) { + //TODO: this is somewhat hackish, refactor to eliminate this later. + LOG(debug, "Waiting for shutdown"); + for (int i=0; + i<50 && !exceptionRethrower.unique(); + ++i) { + boost::this_thread::sleep(boost::posix_time::milliseconds(100)); + } + LOG(debug, "Done waiting for shutdown"); + + if (!exceptionRethrower.unique()) { + EV_STOPPING(programName, "Forced termination"); + kill(getpid(), SIGKILL); + } + } + + void createComponents(const boost::shared_ptr<ExceptionRethrower>& exceptionRethrower, const config::ConfigUri & configUri) { LockGuard guard(_configMutex); _components.reset( - new Components(configUri, + new Components(exceptionRethrower, + configUri, *_zooKeepersConfig, *_fileDistributorConfig, *_rpcConfig)); @@ -204,8 +238,17 @@ public: downloader.setMaxUploadSpeed(config.maxuploadspeed); } + //avoid warning due to scope exit macro +#pragma GCC diagnostic ignored "-Wshadow" void runImpl(const config::ConfigUri & configUri) { - createComponents(configUri); + + BOOST_SCOPE_EXIT((&_components)(&_exceptionRethrower)) { + _components.reset(); + //Ensures that any exception stored during destruction will be available when returning. + ensureExceptionsStored(_exceptionRethrower); + } BOOST_SCOPE_EXIT_END + + createComponents(_exceptionRethrower, configUri); // We do not want back to back reinitializing as it gives zero time for serving // some torrents. @@ -213,15 +256,17 @@ public: while (!askedToShutDown() && (postPoneAskedToReinitializedSecs > 0 || !askedToReinitialize()) && - !completeReconfigurationNeeded()) - { - postPoneAskedToReinitializedSecs--; - std::this_thread::sleep_for(1s); + !completeReconfigurationNeeded() && + !_exceptionRethrower->exceptionStored()) { + postPoneAskedToReinitializedSecs--; + boost::this_thread::sleep(boost::posix_time::seconds(1)); } - _components.reset(); } }; +//TODO: use pop in gcc 4.6 +#pragma GCC diagnostic warning "-Wshadow" + class FileDistributorApplication : public FastOS_Application { const config::ConfigUri _configUri; public: diff --git a/filedistribution/src/apps/status/status-filedistribution.cpp b/filedistribution/src/apps/status/status-filedistribution.cpp index d7dc62e29c4..90c21623016 100644 --- a/filedistribution/src/apps/status/status-filedistribution.cpp +++ b/filedistribution/src/apps/status/status-filedistribution.cpp @@ -5,18 +5,18 @@ LOG_SETUP("status-filedistribution"); #include <iostream> #include <map> -#include <thread> #include <boost/program_options.hpp> +#include <boost/foreach.hpp> +#include <boost/thread.hpp> +#include <vespa/filedistribution/common/exceptionrethrower.h> #include <vespa/filedistribution/model/zkfacade.h> #include <vespa/filedistribution/model/filedistributionmodel.h> #include <vespa/filedistribution/model/filedistributionmodelimpl.h> #include <zookeeper/zookeeper.h> using namespace filedistribution; -using namespace std::literals; -namespace po = boost::program_options; std::string plural(size_t size) @@ -42,7 +42,7 @@ void printWaitingForHosts(const StatusByHostName& notFinishedHosts) { std::cout <<"Waiting for the following host" <<plural(notFinishedHosts) <<":" <<std::endl; - for (const StatusByHostName::value_type & hostNameAndStatus : notFinishedHosts) { + BOOST_FOREACH(const StatusByHostName::value_type hostNameAndStatus, notFinishedHosts) { std::cout <<hostNameAndStatus.first <<" ("; const HostStatus& hostStatus = hostNameAndStatus.second; @@ -60,9 +60,10 @@ printWaitingForHosts(const StatusByHostName& notFinishedHosts) //TODO:refactor int printStatus(const std::string& zkservers) { - std::shared_ptr<ZKFacade> zk(new ZKFacade(zkservers)); + boost::shared_ptr<ExceptionRethrower> exceptionRethrower; + boost::shared_ptr<ZKFacade> zk(new ZKFacade(zkservers, exceptionRethrower)); - std::shared_ptr<FileDBModel> model(new ZKFileDBModel(zk)); + boost::shared_ptr<FileDBModel> model(new ZKFileDBModel(zk)); std::vector<std::string> hosts = model->getHosts(); @@ -70,7 +71,7 @@ int printStatus(const std::string& zkservers) StatusByHostName finishedHosts; bool hasStarted = false; - for (const std::string & host : hosts) { + BOOST_FOREACH(std::string host, hosts) { HostStatus hostStatus = model->getHostStatus(host); switch (hostStatus._state) { case HostStatus::finished: @@ -117,7 +118,7 @@ printStatusRetryIfZKProblem(const std::string& zkservers, const std::string& zkL } catch (ZKSessionExpired& e) { LOG(debug, "Session expired."); } - std::this_thread::sleep_for(500ms); + boost::this_thread::sleep(boost::posix_time::milliseconds(500)); } return 4; } @@ -131,11 +132,12 @@ struct ProgramOptionException { {} }; -bool exists(const std::string& optionName, const po::variables_map& map) { +bool exists(const std::string& optionName, const boost::program_options::variables_map& map) { return map.find(optionName) != map.end(); } -void ensureExists(const std::string& optionName, const po::variables_map& map) { +void ensureExists(const std::string& optionName, const boost::program_options::variables_map& map \ + ) { if (!exists(optionName, map)) { throw ProgramOptionException("Error: Missing option " + optionName); } @@ -150,15 +152,18 @@ main(int argc, char** argv) { *zkLogFile = "zkLogFile", *help = "help"; - po::options_description description; + namespace po = boost::program_options; + boost::program_options::options_description description; description.add_options() (zkstring, po::value<std::string > (), "The zookeeper servers to connect to, separated by comma") (zkLogFile, po::value<std::string >() -> default_value("/dev/null"), "Zookeeper log file") (help, "help"); try { - po::variables_map values; - po::store(po::parse_command_line(argc, argv, description), values); + boost::program_options::variables_map values; + po::store( + boost::program_options::parse_command_line(argc, argv, description), + values); if (exists(help, values)) { std::cout <<description; diff --git a/filedistribution/src/tests/filedbmodelimpl/test-filedistributionmodelimpl.cpp b/filedistribution/src/tests/filedbmodelimpl/test-filedistributionmodelimpl.cpp index 3cf12722d86..84d1b5b958c 100644 --- a/filedistribution/src/tests/filedbmodelimpl/test-filedistributionmodelimpl.cpp +++ b/filedistribution/src/tests/filedbmodelimpl/test-filedistributionmodelimpl.cpp @@ -20,12 +20,14 @@ namespace { struct Fixture { + boost::shared_ptr<ExceptionRethrower> _exceptionRethrower; ComponentsDeleter _componentsDeleter; - std::shared_ptr<ZKFacade> _zk; - std::shared_ptr<FileDistributionModelImpl> _distModel; + boost::shared_ptr<ZKFacade> _zk; + boost::shared_ptr<FileDistributionModelImpl> _distModel; Fixture() { - _zk = _componentsDeleter.track(new ZKFacade("test1-tonyv:2181")); - _distModel.reset(new FileDistributionModelImpl("hostname", 12345, _zk)); + _exceptionRethrower.reset(new ExceptionRethrower()); + _zk = _componentsDeleter.track(new ZKFacade("test1-tonyv:2181", _exceptionRethrower)); + _distModel.reset(new FileDistributionModelImpl("hostname", 12345, _zk, _exceptionRethrower)); } ~Fixture() { } }; diff --git a/filedistribution/src/tests/filedownloader/testfiledownloader.cpp b/filedistribution/src/tests/filedownloader/testfiledownloader.cpp index c9d93ffc218..94b505f4f9f 100644 --- a/filedistribution/src/tests/filedownloader/testfiledownloader.cpp +++ b/filedistribution/src/tests/filedownloader/testfiledownloader.cpp @@ -10,6 +10,8 @@ #include <boost/test/unit_test.hpp> #include <boost/filesystem.hpp> +#include <boost/thread.hpp> +#include <boost/lambda/bind.hpp> #include <boost/filesystem/fstream.hpp> #include <libtorrent/session.hpp> @@ -18,6 +20,7 @@ #include <vespa/filedistribution/manager/createtorrent.h> #include <vespa/filedistribution/model/filedistributionmodel.h> +#include <vespa/filedistribution/common/exceptionrethrower.h> #include <vespa/filedistribution/common/componentsdeleter.h> namespace fs = boost::filesystem; @@ -30,14 +33,15 @@ const int uploaderPort = 9113; const int downloaderPort = 9112; #if 0 -std::shared_ptr<FileDownloader> +boost::shared_ptr<FileDownloader> createDownloader(ComponentsDeleter& deleter, int port, const fs::path& downloaderPath, - const std::shared_ptr<FileDistributionModel>& model) + const boost::shared_ptr<FileDistributionModel>& model, + const boost::shared_ptr<ExceptionRethrower>& exceptionRethrower) { - std::shared_ptr<FileDistributorTrackerImpl> tracker(deleter.track(new FileDistributorTrackerImpl(model))); - std::shared_ptr<FileDownloader> downloader(deleter.track(new FileDownloader(tracker, - localHost, port, downloaderPath))); + boost::shared_ptr<FileDistributorTrackerImpl> tracker(deleter.track(new FileDistributorTrackerImpl(model, exceptionRethrower))); + boost::shared_ptr<FileDownloader> downloader(deleter.track(new FileDownloader(tracker, + localHost, port, downloaderPath, exceptionRethrower))); tracker->setDownloader(downloader); return downloader; @@ -97,22 +101,27 @@ BOOST_AUTO_TEST_CASE(fileDownloaderTest) { Buffer buffer(createTorrent.bencode()); ComponentsDeleter deleter; + boost::shared_ptr<ExceptionRethrower> exceptionRethrower(new ExceptionRethrower()); - std::shared_ptr<FileDistributionModel> model(deleter.track(new MockFileDistributionModel())); - std::shared_ptr<FileDownloader> downloader = - createDownloader(deleter, downloaderPort, downloaderPath, model); + boost::shared_ptr<FileDistributionModel> model(deleter.track(new MockFileDistributionModel())); + boost::shared_ptr<FileDownloader> downloader = + createDownloader(deleter, downloaderPort, downloaderPath, model, exceptionRethrower); - std::shared_ptr<FileDownloader> uploader = - createDownloader(deleter, uploaderPort, uploaderPath, model); + boost::shared_ptr<FileDownloader> uploader = + createDownloader(deleter, uploaderPort, uploaderPath, model, exceptionRethrower); - std::thread uploaderThread( [uploader] () { uploader->runEventLoop(); }); - std::thread downloaderThread( [downloader] () { downloader->runEventLoop(); }); + boost::thread uploaderThread( + boost::lambda::bind(&FileDownloader::runEventLoop, uploader.get())); + + boost::thread downloaderThread( + boost::lambda::bind(&FileDownloader::runEventLoop, downloader.get())); uploader->addTorrent(fileReference, buffer); downloader->addTorrent(fileReference, buffer); sleep(5); BOOST_CHECK(fs::exists(downloaderPath / fileReference / fileToSend)); + BOOST_CHECK(!exceptionRethrower->exceptionStored()); uploaderThread.interrupt(); uploaderThread.join(); diff --git a/filedistribution/src/tests/lib/mock-zookeeper.cpp b/filedistribution/src/tests/lib/mock-zookeeper.cpp index 4d39e41786a..82cd03a268e 100644 --- a/filedistribution/src/tests/lib/mock-zookeeper.cpp +++ b/filedistribution/src/tests/lib/mock-zookeeper.cpp @@ -7,8 +7,7 @@ #include <cstring> #include <vector> -#include <thread> -#include <atomic> +#include <boost/thread.hpp> #include <boost/lexical_cast.hpp> #include <iostream> @@ -56,9 +55,7 @@ struct Node { void triggerWatches(zhandle_t* zh, const std::string& path); }; -std::shared_ptr<Node> sharedRoot; - -void doNothing() { } +boost::shared_ptr<Node> sharedRoot; struct ZHandle { struct Worker { @@ -71,12 +68,11 @@ struct ZHandle { int sequence; - std::shared_ptr<Node> root; - std::atomic<bool> _closed; - std::thread _watchersThread; + boost::shared_ptr<Node> root; + boost::thread _watchersThread; vector<string> ephemeralNodes; - typedef std::function<void (void)> InvokeWatcherFun; + typedef boost::function<void (void)> InvokeWatcherFun; ConcurrentQueue<InvokeWatcherFun> watcherInvocations; Node& getNode(const string& path); @@ -87,7 +83,7 @@ struct ZHandle { ephemeralNodes.push_back(path); } - ZHandle() : sequence(0), _closed(false), _watchersThread(Worker(this)) { + ZHandle() : sequence(0), _watchersThread(Worker(this)) { if (!sharedRoot) sharedRoot.reset(new Node()); @@ -96,21 +92,21 @@ struct ZHandle { ~ZHandle() { std::for_each(ephemeralNodes.begin(), ephemeralNodes.end(), - [this] (const string & s) { zoo_delete((zhandle_t*)this, s.c_str(), 0); }); - close(); + boost::bind(&zoo_delete, (zhandle_t*)this, + boost::bind(&string::c_str, _1), + 0)); + + _watchersThread.interrupt(); _watchersThread.join(); } - void close() { - _closed.store(true); - watcherInvocations.push(std::ref(doNothing)); - } }; void ZHandle::Worker::operator()() { - while (! zhandle._closed.load()) { + while (!boost::this_thread::interruption_requested()) { InvokeWatcherFun fun = zhandle.watcherInvocations.pop(); + boost::this_thread::disable_interruption di; fun(); } } @@ -138,7 +134,10 @@ ZHandle::getParent(const string& childPath) void Node::triggerWatches(zhandle_t* zh, const std::string& path) { for (auto i = watchers.begin(); i != watchers.end(); ++i) { - ((ZHandle*)zh)->watcherInvocations.push([zh, i, path] () { i->first(zh, 0, 0, path.c_str(), i->second); }); + ((ZHandle*)zh)->watcherInvocations.push(boost::bind(i->first, zh, \ + /*TODO: type, state*/ 0, 0, + boost::bind(&string::c_str, path), + i->second)); } watchers.clear(); } diff --git a/filedistribution/src/tests/rpc/testfileprovider.cpp b/filedistribution/src/tests/rpc/testfileprovider.cpp index 6be172d0afd..6881eb96b8a 100644 --- a/filedistribution/src/tests/rpc/testfileprovider.cpp +++ b/filedistribution/src/tests/rpc/testfileprovider.cpp @@ -17,8 +17,8 @@ const std::string MockFileProvider::_queueForeverFileReference("queue-forever"); BOOST_AUTO_TEST_CASE(fileDistributionRPCTest) { const std::string spec("tcp/localhost:9111"); - fd::FileProvider::SP provider(new fd::MockFileProvider()); - fd::FileDistributorRPC::SP fileDistributorRPC(new fd::FileDistributorRPC(spec, provider)); + boost::shared_ptr<fd::MockFileProvider> provider(new fd::MockFileProvider()); + boost::shared_ptr<fd::FileDistributorRPC> fileDistributorRPC(new fd::FileDistributorRPC(spec, provider)); fileDistributorRPC->start(); frtstream::FrtClientStream rpc(spec); @@ -37,8 +37,8 @@ BOOST_AUTO_TEST_CASE(fileDistributionRPCTest) { //must be run through valgrind BOOST_AUTO_TEST_CASE(require_that_queued_requests_does_not_leak_memory) { const std::string spec("tcp/localhost:9111"); - std::shared_ptr<MockFileProvider> provider(new MockFileProvider()); - fd::FileDistributorRPC::SP fileDistributorRPC(new fd::FileDistributorRPC(spec, provider)); + boost::shared_ptr<MockFileProvider> provider(new MockFileProvider()); + boost::shared_ptr<fd::FileDistributorRPC> fileDistributorRPC(new fd::FileDistributorRPC(spec, provider)); fileDistributorRPC->start(); FRT_Supervisor supervisor; diff --git a/filedistribution/src/tests/scheduler/test-scheduler.cpp b/filedistribution/src/tests/scheduler/test-scheduler.cpp index a9249bbdcae..cc669690a31 100644 --- a/filedistribution/src/tests/scheduler/test-scheduler.cpp +++ b/filedistribution/src/tests/scheduler/test-scheduler.cpp @@ -9,10 +9,8 @@ #include <iostream> #include <boost/thread/barrier.hpp> -#include <thread> using filedistribution::Scheduler; -using namespace std::literals; namespace asio = boost::asio; @@ -27,11 +25,13 @@ struct CallRun { {} void operator()(asio::io_service& ioService) { - try { - //No reset needed after handling exceptions. - ioService.run(); - } catch(const TestException& e ) { - _caughtException = true; + while (!boost::this_thread::interruption_requested()) { + try { + //No reset needed after handling exceptions. + ioService.run(); + } catch(const TestException& e ) { + _caughtException = true; + } } } }; @@ -41,7 +41,7 @@ struct Fixture { Scheduler scheduler; Fixture() - : scheduler(std::ref(callRun)) + : scheduler(boost::ref(callRun)) {} }; @@ -101,7 +101,7 @@ BOOST_AUTO_TEST_CASE(require_exception_from_tasks_can_be_caught) { task->scheduleNow(); for (int i=0; i<200 && !callRun._caughtException; ++i) { - std::this_thread::sleep_for(100ms); + boost::thread::sleep(boost::get_system_time() + boost::posix_time::milliseconds(100)); } BOOST_CHECK(callRun._caughtException); diff --git a/filedistribution/src/tests/status/test-status.cpp b/filedistribution/src/tests/status/test-status.cpp index 4fbda2cb9c3..7021752f316 100644 --- a/filedistribution/src/tests/status/test-status.cpp +++ b/filedistribution/src/tests/status/test-status.cpp @@ -3,7 +3,9 @@ #define BOOST_TEST_MAIN #include <vespa/fastos/fastos.h> #include <boost/test/unit_test.hpp> +#include <boost/foreach.hpp> +#include <vespa/filedistribution/common/exceptionrethrower.h> #include <vespa/filedistribution/model/zkfacade.h> #include <vespa/filedistribution/model/filedistributionmodel.h> #include <vespa/filedistribution/model/filedistributionmodelimpl.h> diff --git a/filedistribution/src/tests/zkfacade/test-zkfacade.cpp b/filedistribution/src/tests/zkfacade/test-zkfacade.cpp index ada601742db..d45e5059a53 100644 --- a/filedistribution/src/tests/zkfacade/test-zkfacade.cpp +++ b/filedistribution/src/tests/zkfacade/test-zkfacade.cpp @@ -8,6 +8,7 @@ #include <iostream> #include <boost/thread/barrier.hpp> +#include <boost/thread/thread.hpp> #include <boost/checked_delete.hpp> #include <vespa/filedistribution/common/componentsdeleter.h> @@ -16,7 +17,7 @@ #include <zookeeper/zookeeper.h> -using namespace std::literals; + using namespace filedistribution; namespace { @@ -34,13 +35,16 @@ struct Watcher : public ZKFacade::NodeChangedWatcher { }; struct Fixture { + boost::shared_ptr<ExceptionRethrower> _exceptionRethrower; ComponentsDeleter _componentsDeleter; - std::shared_ptr<ZKFacade> zk; + boost::shared_ptr<ZKFacade> zk; ZKFacade::Path testNode; Fixture() { + _exceptionRethrower.reset(new ExceptionRethrower()); + zoo_set_debug_level(ZOO_LOG_LEVEL_WARN); - zk = _componentsDeleter.track(new ZKFacade("test1-tonyv:2181")); + zk = _componentsDeleter.track(new ZKFacade("test1-tonyv:2181", _exceptionRethrower)); testNode = "/test-node"; zk->removeIfExists(testNode); @@ -70,7 +74,7 @@ BOOST_AUTO_TEST_CASE(hasNode) BOOST_AUTO_TEST_CASE(hasNodeNotification) { - std::shared_ptr<Watcher> watcher(new Watcher); + boost::shared_ptr<Watcher> watcher(new Watcher); zk->hasNode(testNode, watcher); zk->setData(testNode, "", 0); @@ -78,7 +82,7 @@ BOOST_AUTO_TEST_CASE(hasNodeNotification) //after the notification has returned, the watcher must no longer reside in watchers map. for (int i=0; i<20 && !watcher.unique(); ++i) { - std::this_thread::sleep_for(100ms); + boost::thread::sleep(boost::get_system_time() + boost::posix_time::milliseconds(100)); } BOOST_CHECK(watcher.unique()); } @@ -152,7 +156,8 @@ BOOST_AUTO_TEST_CASE(addEphemeralNode) zk->removeIfExists(ephemeralNode); //Checked deleter is ok here since we're not installing any watchers - ZKFacade::SP zk2(new ZKFacade("test1-tonyv:2181"), boost::checked_deleter<ZKFacade>()); + ZKFacade::SP zk2(new ZKFacade("test1-tonyv:2181", _exceptionRethrower), + boost::checked_deleter<ZKFacade>()); zk2->addEphemeralNode(ephemeralNode); BOOST_CHECK(zk->hasNode(ephemeralNode)); @@ -164,7 +169,7 @@ BOOST_AUTO_TEST_CASE(addEphemeralNode) BOOST_AUTO_TEST_CASE(dataChangedNotification) { - std::shared_ptr<Watcher> watcher(new Watcher); + boost::shared_ptr<Watcher> watcher(new Watcher); zk->setData(testNode, "", 0); Buffer buffer(zk->getData(testNode, watcher)); @@ -177,7 +182,7 @@ BOOST_AUTO_TEST_CASE(dataChangedNotification) BOOST_AUTO_TEST_CASE(getChildrenNotification) { - std::shared_ptr<Watcher> watcher(new Watcher); + boost::shared_ptr<Watcher> watcher(new Watcher); zk->setData(testNode, "", 0); zk->getChildren(testNode, watcher); @@ -189,9 +194,9 @@ BOOST_AUTO_TEST_CASE(getChildrenNotification) BOOST_AUTO_TEST_CASE(require_that_zkfacade_can_be_deleted_from_callback) { struct DeleteZKFacadeWatcher : public Watcher { - std::shared_ptr<ZKFacade> _zk; + boost::shared_ptr<ZKFacade> _zk; - DeleteZKFacadeWatcher(const std::shared_ptr<ZKFacade>& zk) + DeleteZKFacadeWatcher(const boost::shared_ptr<ZKFacade>& zk) :_zk(zk) {} @@ -202,7 +207,7 @@ BOOST_AUTO_TEST_CASE(require_that_zkfacade_can_be_deleted_from_callback) } }; - std::shared_ptr<Watcher> watcher((Watcher*)new DeleteZKFacadeWatcher(zk)); + boost::shared_ptr<Watcher> watcher((Watcher*)new DeleteZKFacadeWatcher(zk)); zk->setData(testNode, "", 0); zk->getData(testNode, watcher); diff --git a/filedistribution/src/tests/zkfiledbmodel/test-zkfiledbmodel.cpp b/filedistribution/src/tests/zkfiledbmodel/test-zkfiledbmodel.cpp index 6a3a87aac96..b385949bb98 100644 --- a/filedistribution/src/tests/zkfiledbmodel/test-zkfiledbmodel.cpp +++ b/filedistribution/src/tests/zkfiledbmodel/test-zkfiledbmodel.cpp @@ -7,6 +7,10 @@ #include <iostream> +#include <boost/thread/barrier.hpp> +#include <boost/thread/thread.hpp> +#include <boost/checked_delete.hpp> + #include <vespa/filedistribution/common/componentsdeleter.h> #include <vespa/filedistribution/model/zkfacade.h> #include <vespa/filedistribution/model/zkfiledbmodel.h> @@ -22,13 +26,16 @@ namespace { struct Fixture { + boost::shared_ptr<ExceptionRethrower> _exceptionRethrower; ComponentsDeleter _componentsDeleter; - std::shared_ptr<ZKFacade> zk; - std::shared_ptr<ZKFileDBModel> model; + boost::shared_ptr<ZKFacade> zk; + boost::shared_ptr<ZKFileDBModel> model; Fixture() { + _exceptionRethrower.reset(new ExceptionRethrower()); + zoo_set_debug_level(ZOO_LOG_LEVEL_WARN); - zk = _componentsDeleter.track(new ZKFacade("test1-tonyv:2181")); + zk = _componentsDeleter.track(new ZKFacade("test1-tonyv:2181", _exceptionRethrower)); zk->setData("/vespa", "", 0); model = _componentsDeleter.track(new ZKFileDBModel(zk)); diff --git a/filedistribution/src/vespa/filedistribution/common/componentsdeleter.cpp b/filedistribution/src/vespa/filedistribution/common/componentsdeleter.cpp index 3c6a265941a..74b36b77a72 100644 --- a/filedistribution/src/vespa/filedistribution/common/componentsdeleter.cpp +++ b/filedistribution/src/vespa/filedistribution/common/componentsdeleter.cpp @@ -5,7 +5,9 @@ #include <vespa/log/log.h> LOG_SETUP(".componentsdeleter"); -using namespace std::literals; +#include <boost/foreach.hpp> + + using namespace filedistribution; struct ComponentsDeleter::Worker { @@ -21,21 +23,25 @@ struct ComponentsDeleter::Worker { void ComponentsDeleter::Worker::operator()() { - while ( ! _parent.areWeDone() ) { - CallDeleteFun deleteFun = _parent._deleteRequests.pop(); - deleteFun(); + while (!boost::this_thread::interruption_requested()) { + try { + CallDeleteFun deleteFun = _parent._deleteRequests.pop(); + boost::this_thread::disable_interruption di; + deleteFun(); + } catch(const std::exception& e) { + LOG(error, e.what()); + } } } -ComponentsDeleter::ComponentsDeleter() : - _closed(false), - _deleterThread(Worker(this)) +ComponentsDeleter::ComponentsDeleter() + :_deleterThread(Worker(this)) {} ComponentsDeleter::~ComponentsDeleter() { - close(); waitForAllComponentsDeleted(); + _deleterThread.interrupt(); _deleterThread.join(); } @@ -44,29 +50,31 @@ ComponentsDeleter::waitForAllComponentsDeleted() { LOG(debug, "Waiting for all components to be deleted"); - for (int i=0; i<600 && !areWeDone(); ++i) { - std::this_thread::sleep_for(100ms); + for (int i=0; i<600 && !allComponentsDeleted(); ++i) { + boost::this_thread::sleep(boost::posix_time::milliseconds(100)); } LOG(debug, "Done waiting for all components to be deleted"); - assert(_trackedComponents.empty()); - assert(_deleteRequests.empty()); + + logNotDeletedComponents(); + + if (!allComponentsDeleted()) + kill(getpid(), SIGKILL); } - -void -ComponentsDeleter::close() + +bool +ComponentsDeleter::allComponentsDeleted() { - { - LockGuard guard(_trackedComponentsMutex); - _closed = true; - } - _deleteRequests.push([]() { LOG(debug, "I am the last one, hurry up and shutdown"); }); + LockGuard guard(_trackedComponentsMutex); + return _trackedComponents.empty(); } -bool -ComponentsDeleter::areWeDone() +void +ComponentsDeleter::logNotDeletedComponents() { LockGuard guard(_trackedComponentsMutex); - return _closed && _trackedComponents.empty() && _deleteRequests.empty(); + BOOST_FOREACH(TrackedComponentsMap::value_type component, _trackedComponents) { + LOG(info, "Timed out waiting for component '%s' to be deleted", component.second.c_str()); + } } void diff --git a/filedistribution/src/vespa/filedistribution/common/componentsdeleter.h b/filedistribution/src/vespa/filedistribution/common/componentsdeleter.h index 46d79663536..4238f88a05e 100644 --- a/filedistribution/src/vespa/filedistribution/common/componentsdeleter.h +++ b/filedistribution/src/vespa/filedistribution/common/componentsdeleter.h @@ -4,8 +4,12 @@ #include <map> #include <typeinfo> #include <string> -#include <mutex> -#include <thread> + +#include <boost/function.hpp> +#include <boost/bind.hpp> +#include <boost/checked_delete.hpp> +#include <boost/thread/mutex.hpp> +#include <boost/thread.hpp> #include "concurrentqueue.h" @@ -18,16 +22,16 @@ namespace filedistribution { */ class ComponentsDeleter { class Worker; - typedef std::lock_guard<std::mutex> LockGuard; + typedef boost::lock_guard<boost::mutex> LockGuard; - std::mutex _trackedComponentsMutex; + boost::mutex _trackedComponentsMutex; typedef std::map<void*, std::string> TrackedComponentsMap; TrackedComponentsMap _trackedComponents; - typedef std::function<void (void)> CallDeleteFun; + typedef boost::function<void (void)> CallDeleteFun; ConcurrentQueue<CallDeleteFun> _deleteRequests; - bool _closed; - std::thread _deleterThread; + + boost::thread _deleterThread; void removeFromTrackedComponents(void* component); @@ -39,12 +43,12 @@ class ComponentsDeleter { template <class T> void requestDelete(T* component) { - _deleteRequests.push([this, component]() { deleteComponent<T>(component); }); + _deleteRequests.push(boost::bind(&ComponentsDeleter::deleteComponent<T>, this, component)); } void waitForAllComponentsDeleted(); - bool areWeDone(); - void close(); + bool allComponentsDeleted(); + void logNotDeletedComponents(); public: ComponentsDeleter(const ComponentsDeleter &) = delete; ComponentsDeleter & operator = (const ComponentsDeleter &) = delete; @@ -57,14 +61,11 @@ class ComponentsDeleter { ~ComponentsDeleter(); template <class T> - std::shared_ptr<T> track(T* t) { + boost::shared_ptr<T> track(T* t) { LockGuard guard(_trackedComponentsMutex); - if (_closed) { - return std::shared_ptr<T>(t); - } _trackedComponents[t] = typeid(t).name(); - return std::shared_ptr<T>(t, [this](T * p) { requestDelete<T>(p); }); + return boost::shared_ptr<T>(t, boost::bind(&ComponentsDeleter::requestDelete<T>, this, t)); } }; } diff --git a/filedistribution/src/vespa/filedistribution/common/concurrentqueue.h b/filedistribution/src/vespa/filedistribution/common/concurrentqueue.h index 21b8ade0ab0..056ba3153a2 100644 --- a/filedistribution/src/vespa/filedistribution/common/concurrentqueue.h +++ b/filedistribution/src/vespa/filedistribution/common/concurrentqueue.h @@ -3,8 +3,9 @@ #include <queue> -#include <mutex> -#include <condition_variable> +#include <boost/thread/condition_variable.hpp> +#include <boost/thread/mutex.hpp> +#include <boost/thread/locks.hpp> namespace filedistribution { @@ -13,10 +14,10 @@ class ConcurrentQueue { public: typedef T value_type; private: - std::condition_variable _nonEmpty; + boost::condition_variable _nonEmpty; - mutable std::mutex _queueMutex; - typedef std::unique_lock<std::mutex> UniqueLock; + mutable boost::mutex _queueMutex; + typedef boost::unique_lock<boost::mutex> UniqueLock; std::queue<value_type> _queue; @@ -46,10 +47,6 @@ public: _queue.pop(); } } - bool empty() { - UniqueLock guard(_queueMutex); - return _queue.empty(); - } }; } //namespace filedistribution diff --git a/filedistribution/src/vespa/filedistribution/common/exceptionrethrower.h b/filedistribution/src/vespa/filedistribution/common/exceptionrethrower.h new file mode 100644 index 00000000000..28b45546a64 --- /dev/null +++ b/filedistribution/src/vespa/filedistribution/common/exceptionrethrower.h @@ -0,0 +1,47 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <boost/thread/mutex.hpp> +#include <boost/thread/locks.hpp> +#include <boost/exception_ptr.hpp> +#include <boost/type_traits/is_polymorphic.hpp> + +namespace filedistribution { + +//used for rethrowing an exceptions in a different context +class ExceptionRethrower { + boost::exception_ptr _exceptionPtr; //not a pod, default constructed to null value + + mutable boost::mutex _exceptionMutex; + typedef boost::lock_guard<boost::mutex> LockGuard; + +public: + void rethrow() const { + LockGuard guard(_exceptionMutex); + + if (_exceptionPtr) + boost::rethrow_exception(_exceptionPtr); + } + + bool exceptionStored() const { + LockGuard guard(_exceptionMutex); + return _exceptionPtr; + } + + template <class T> + void store(const T& exception) { + boost::exception_ptr exceptionPtr = boost::copy_exception(exception); + store(exceptionPtr); + } + + void store(const boost::exception_ptr exceptionPtr) { + LockGuard guard(_exceptionMutex); + + if (!_exceptionPtr) //only store the first exception to be rethrowed. + _exceptionPtr = exceptionPtr; + } +}; + +} //namespace filedistribution + + diff --git a/filedistribution/src/vespa/filedistribution/distributor/filedistributortrackerimpl.cpp b/filedistribution/src/vespa/filedistribution/distributor/filedistributortrackerimpl.cpp index 7eb0ab957ff..055a72e26b3 100644 --- a/filedistribution/src/vespa/filedistribution/distributor/filedistributortrackerimpl.cpp +++ b/filedistribution/src/vespa/filedistribution/distributor/filedistributortrackerimpl.cpp @@ -15,7 +15,7 @@ using filedistribution::FileDistributorTrackerImpl; using filedistribution::FileDownloader; using filedistribution::FileDistributionModel; using filedistribution::Scheduler; -using filedistribution::TorrentSP; +using filedistribution::ExceptionRethrower; typedef FileDistributionModel::PeerEntries PeerEntries; @@ -60,14 +60,14 @@ struct TrackingTask : public Scheduler::Task { libtorrent::tracker_request _trackerRequest; boost::weak_ptr<libtorrent::torrent> _torrent; - std::weak_ptr<FileDownloader> _downloader; - std::shared_ptr<FileDistributionModel> _model; + boost::weak_ptr<FileDownloader> _downloader; + boost::shared_ptr<FileDistributionModel> _model; TrackingTask(Scheduler& scheduler, const libtorrent::tracker_request& trackerRequest, - const TorrentSP & torrent, - const std::weak_ptr<FileDownloader>& downloader, - const std::shared_ptr<FileDistributionModel>& model) + const boost::shared_ptr<libtorrent::torrent>& torrent, + const boost::weak_ptr<FileDownloader>& downloader, + const boost::shared_ptr<FileDistributionModel>& model) : Task(scheduler), _numTimesRescheduled(0), _trackerRequest(trackerRequest), @@ -78,12 +78,12 @@ struct TrackingTask : public Scheduler::Task { //TODO: refactor void doHandle() { - if (std::shared_ptr<FileDownloader> downloader = _downloader.lock()) { + if (boost::shared_ptr<FileDownloader> downloader = _downloader.lock()) { //All torrents must be destructed before the session is destructed. //It's okay to prevent the torrent from expiring here //since the session can't be destructed while //we hold a shared_ptr to the downloader. - if (TorrentSP torrent = _torrent.lock()) { + if (boost::shared_ptr<libtorrent::torrent> torrent = _torrent.lock()) { PeerEntries peers = getPeers(downloader); if (!peers.empty()) { @@ -108,7 +108,7 @@ struct TrackingTask : public Scheduler::Task { } } - PeerEntries getPeers(const std::shared_ptr<FileDownloader>& downloader) { + PeerEntries getPeers(const boost::shared_ptr<FileDownloader>& downloader) { std::string fileReference = downloader->infoHash2FileReference(_trackerRequest.info_hash); const size_t recommendedMaxNumberOfPeers = 30; @@ -134,12 +134,34 @@ struct TrackingTask : public Scheduler::Task { } }; + +void +workerFunction(boost::shared_ptr<ExceptionRethrower> exceptionRethrower, asio::io_service& ioService) +{ + while (!boost::this_thread::interruption_requested()) { + try { + //No reset needed after handling exceptions. + ioService.run(); + } catch(const boost::thread_interrupted&) { + LOG(debug, "Tracker worker thread interrupted."); + throw; + } catch(...) { + exceptionRethrower->store(boost::current_exception()); + } + } +} + } //anonymous namespace -FileDistributorTrackerImpl::FileDistributorTrackerImpl(const std::shared_ptr<FileDistributionModel>& model) : + +FileDistributorTrackerImpl::FileDistributorTrackerImpl( + const boost::shared_ptr<FileDistributionModel>& model, + const boost::shared_ptr<ExceptionRethrower>& exceptionRethrower) + :_exceptionRethrower(exceptionRethrower), _model(model) {} + FileDistributorTrackerImpl::~FileDistributorTrackerImpl() { LOG(debug, "Deconstructing FileDistributorTrackerImpl"); @@ -147,23 +169,25 @@ FileDistributorTrackerImpl::~FileDistributorTrackerImpl() { _scheduler.reset(); } + void FileDistributorTrackerImpl::trackingRequest( libtorrent::tracker_request& request, - const TorrentSP & torrent) + const boost::shared_ptr<libtorrent::torrent> & torrent) { LockGuard guard(_mutex); - if (torrent != TorrentSP()) { - std::shared_ptr<TrackingTask> trackingTask(new TrackingTask( + if (torrent != boost::shared_ptr<libtorrent::torrent>()) { + boost::shared_ptr<TrackingTask> trackingTask(new TrackingTask( *_scheduler.get(), request, torrent, _downloader, _model)); trackingTask->scheduleNow(); } } + void -FileDistributorTrackerImpl::setDownloader(const std::shared_ptr<FileDownloader>& downloader) +FileDistributorTrackerImpl::setDownloader(const boost::shared_ptr<FileDownloader>& downloader) { LockGuard guard(_mutex); @@ -171,6 +195,6 @@ FileDistributorTrackerImpl::setDownloader(const std::shared_ptr<FileDownloader>& _downloader = downloader; if (downloader) { - _scheduler.reset(new Scheduler([] (asio::io_service& ioService) { ioService.run(); })); + _scheduler.reset(new Scheduler(boost::bind(&workerFunction, _exceptionRethrower, _1))); } } diff --git a/filedistribution/src/vespa/filedistribution/distributor/filedistributortrackerimpl.h b/filedistribution/src/vespa/filedistribution/distributor/filedistributortrackerimpl.h index bf72a2b80df..edbdb9b8943 100644 --- a/filedistribution/src/vespa/filedistribution/distributor/filedistributortrackerimpl.h +++ b/filedistribution/src/vespa/filedistribution/distributor/filedistributortrackerimpl.h @@ -4,38 +4,41 @@ #include <libtorrent/session.hpp> #include <libtorrent/torrent.hpp> +#include <boost/thread.hpp> +#include <boost/shared_ptr.hpp> #include <boost/asio/io_service.hpp> #include <boost/asio/deadline_timer.hpp> #include <vespa/filedistribution/model/filedistributionmodel.h> +#include <vespa/filedistribution/common/exceptionrethrower.h> #include "scheduler.h" -#include <mutex> namespace filedistribution { class FileDistributionModel; class FileDownloader; -using TorrentSP = boost::shared_ptr<libtorrent::torrent>; - class FileDistributorTrackerImpl : public FileDistributionTracker { - const std::shared_ptr<FileDistributionModel> _model; + const boost::shared_ptr<ExceptionRethrower> _exceptionRethrower; + const boost::shared_ptr<FileDistributionModel> _model; - typedef std::lock_guard<std::mutex> LockGuard; - std::mutex _mutex; - std::weak_ptr<FileDownloader> _downloader; + typedef boost::lock_guard<boost::mutex> LockGuard; + boost::mutex _mutex; + boost::weak_ptr<FileDownloader> _downloader; //Use separate worker thread to avoid potential deadlock //between tracker requests and files to download changed requests. boost::scoped_ptr<Scheduler> _scheduler; public: - FileDistributorTrackerImpl(const std::shared_ptr<FileDistributionModel>& model); + FileDistributorTrackerImpl(const boost::shared_ptr<FileDistributionModel>& model, + const boost::shared_ptr<ExceptionRethrower>& exceptionRethrower); virtual ~FileDistributorTrackerImpl(); //overrides - void trackingRequest(libtorrent::tracker_request& request, const TorrentSP & torrent); + void trackingRequest(libtorrent::tracker_request& request, + const boost::shared_ptr<libtorrent::torrent> & torrent); - void setDownloader(const std::shared_ptr<FileDownloader>& downloader); + void setDownloader(const boost::shared_ptr<FileDownloader>& downloader); }; } //namespace filedistribution diff --git a/filedistribution/src/vespa/filedistribution/distributor/filedownloader.cpp b/filedistribution/src/vespa/filedistribution/distributor/filedownloader.cpp index 7d5d7acceb2..546ae8028f8 100644 --- a/filedistribution/src/vespa/filedistribution/distributor/filedownloader.cpp +++ b/filedistribution/src/vespa/filedistribution/distributor/filedownloader.cpp @@ -9,7 +9,11 @@ #include <boost/filesystem.hpp> #include <boost/filesystem/fstream.hpp> #include <boost/filesystem/convenience.hpp> +#include <boost/lambda/lambda.hpp> +#include <boost/lambda/bind.hpp> #include <boost/function_output_iterator.hpp> +#include <boost/foreach.hpp> +#include <boost/thread.hpp> #include <libtorrent/alert.hpp> #include <libtorrent/alert_types.hpp> @@ -172,9 +176,11 @@ struct FileDownloader::EventHandler void operator()(const libtorrent::save_resume_data_alert& alert) const { defaultHandler(alert); - fs::ofstream resumeFile(resumeDataPathTemp(alert.handle), std::ios_base::binary); + fs::ofstream resumeFile(resumeDataPathTemp(alert.handle), + std::ios_base::binary); resumeFile.unsetf(std::ios_base::skipws); - libtorrent::bencode(std::ostream_iterator<char>(resumeFile), *alert.resume_data); + libtorrent::bencode(std::ostream_iterator<char>(resumeFile), + *alert.resume_data); resumeFile.close(); fs::rename(resumeDataPathTemp(alert.handle), resumeDataPath(alert.handle)); _fileDownloader.didReceiveSRD(); @@ -203,14 +209,15 @@ FileDownloader::LogSessionDeconstructed::~LogSessionDeconstructed() LOG(debug, "Libtorrent session closed successfully."); } -FileDownloader::FileDownloader(const std::shared_ptr<FileDistributionTracker>& tracker, +FileDownloader::FileDownloader(const boost::shared_ptr<FileDistributionTracker>& tracker, const std::string& hostName, int port, - const fs::path& dbPath) + const fs::path& dbPath, + const boost::shared_ptr<ExceptionRethrower>& exceptionRethrower) : _outstanding_SRD_requests(0), _tracker(tracker), _session(tracker.get(), libtorrent::fingerprint("vp", 0, 0, 0, 0), 0), - _closed(false), _dbPath(dbPath), + _exceptionRethrower(exceptionRethrower), _hostName(hostName), _port(port) { @@ -351,17 +358,20 @@ FileDownloader::removeAllTorrentsBut(const std::set<std::string> & filesToRetain LockGuard guard(_modifyTorrentsDownloadingMutex); std::set<std::string> currentFiles; + namespace ll = boost::lambda; + std::set<sha1_hash> infoHashesToRetain; - for (const std::string& fileReference : filesToRetain) { + BOOST_FOREACH(const std::string& fileReference, filesToRetain) { infoHashesToRetain.insert(toInfoHash(fileReference)); } std::vector<torrent_handle> torrents = _session.get_torrents(); - for (torrent_handle torrent : torrents) { + BOOST_FOREACH(torrent_handle torrent, torrents) { if (!infoHashesToRetain.count(torrent.info_hash())) { LOG(info, "Removing torrent: '%s' with file reference '%s'", - getMainName(torrent).c_str(), fileReferenceToString(torrent.info_hash()).c_str()); + getMainName(torrent).c_str(), + fileReferenceToString(torrent.info_hash()).c_str()); deleteTorrentData(torrent, guard); _session.remove_torrent(torrent); @@ -372,26 +382,20 @@ FileDownloader::removeAllTorrentsBut(const std::set<std::string> & filesToRetain void FileDownloader::runEventLoop() { EventHandler eventHandler(this); - while ( ! closed() ) { - if (_session.wait_for_alert(libtorrent::milliseconds(100))) { - std::unique_ptr<libtorrent::alert> alert = _session.pop_alert(); - eventHandler.handle(std::move(alert)); + try { + while (!boost::this_thread::interruption_requested()) { + if (_session.wait_for_alert(libtorrent::milliseconds(100))) { + std::unique_ptr<libtorrent::alert> alert = _session.pop_alert(); + eventHandler.handle(std::move(alert)); + } } + } catch(const boost::thread_interrupted&) { + LOG(spam, "The FileDownloader thread was interrupted."); + } catch(...) { + _exceptionRethrower->store(boost::current_exception()); } } -bool -FileDownloader::closed() const -{ - return _closed.load(); -} - -void -FileDownloader::close() -{ - _closed.store(true); -} - void FileDownloader::signalIfFinishedDownloading(const std::string& fileReference) { boost::optional<fs::path> path = pathToCompletedFile(fileReference); diff --git a/filedistribution/src/vespa/filedistribution/distributor/filedownloader.h b/filedistribution/src/vespa/filedistribution/distributor/filedownloader.h index 38de8ac4357..9056f437664 100644 --- a/filedistribution/src/vespa/filedistribution/distributor/filedownloader.h +++ b/filedistribution/src/vespa/filedistribution/distributor/filedownloader.h @@ -2,7 +2,7 @@ #pragma once #include <vector> -#include <mutex> +#include <boost/thread/mutex.hpp> #include <boost/filesystem/path.hpp> #include <boost/optional.hpp> #include <boost/multi_index_container.hpp> @@ -15,6 +15,7 @@ #include <vespa/filedistribution/rpc/fileprovider.h> #include "hostname.h" #include <vespa/filedistribution/common/buffer.h> +#include <vespa/filedistribution/common/exceptionrethrower.h> #include <vespa/filedistribution/common/exception.h> #include <vespa/filedistribution/model/filedbmodel.h> @@ -39,15 +40,14 @@ class FileDownloader }; size_t _outstanding_SRD_requests; - std::shared_ptr<FileDistributionTracker> _tracker; + boost::shared_ptr<FileDistributionTracker> _tracker; - std::mutex _modifyTorrentsDownloadingMutex; - typedef std::lock_guard<std::mutex> LockGuard; + boost::mutex _modifyTorrentsDownloadingMutex; + typedef boost::lock_guard<boost::mutex> LockGuard; LogSessionDeconstructed _logSessionDeconstructed; //session is safe to use from multiple threads. libtorrent::session _session; - std::atomic<bool> _closed; const boost::filesystem::path _dbPath; typedef std::vector<char> ResumeDataBuffer; @@ -65,9 +65,10 @@ public: typedef FileProvider::DownloadCompletedSignal DownloadCompletedSignal; typedef FileProvider::DownloadFailedSignal DownloadFailedSignal; - FileDownloader(const std::shared_ptr<FileDistributionTracker>& tracker, + FileDownloader(const boost::shared_ptr<FileDistributionTracker>& tracker, const std::string& hostName, int port, - const boost::filesystem::path& dbPath); + const boost::filesystem::path& dbPath, + const boost::shared_ptr<ExceptionRethrower>& exceptionRethrower); ~FileDownloader(); DirectoryGuard::UP getGuard() { return std::make_unique<DirectoryGuard>(_dbPath); } @@ -82,8 +83,8 @@ public: std::string infoHash2FileReference(const libtorrent::sha1_hash& hash); void setMaxDownloadSpeed(double MBPerSec); void setMaxUploadSpeed(double MBPerSec); - void close(); - bool closed() const; + + const boost::shared_ptr<ExceptionRethrower> _exceptionRethrower; const std::string _hostName; const int _port; diff --git a/filedistribution/src/vespa/filedistribution/distributor/filedownloadermanager.cpp b/filedistribution/src/vespa/filedistribution/distributor/filedownloadermanager.cpp index bf17b1bc8d1..7bc57c57dd1 100644 --- a/filedistribution/src/vespa/filedistribution/distributor/filedownloadermanager.cpp +++ b/filedistribution/src/vespa/filedistribution/distributor/filedownloadermanager.cpp @@ -7,12 +7,14 @@ LOG_SETUP(".filedownloadermanager"); #include <iterator> #include <sstream> -#include <thread> - -using namespace std::literals; +#include <boost/lambda/lambda.hpp> +#include <boost/lambda/bind.hpp> +#include <boost/thread.hpp> using filedistribution::FileDownloaderManager; +namespace lambda = boost::lambda; + namespace { void logStartDownload(const std::set<std::string> & filesToDownload) { std::ostringstream msg; @@ -24,8 +26,8 @@ void logStartDownload(const std::set<std::string> & filesToDownload) { } //anonymous namespace FileDownloaderManager::FileDownloaderManager( - const std::shared_ptr<FileDownloader>& downloader, - const std::shared_ptr<FileDistributionModel>& model) + const boost::shared_ptr<FileDownloader>& downloader, + const boost::shared_ptr<FileDistributionModel>& model) :_fileDownloader(downloader), _fileDistributionModel(model), @@ -40,14 +42,20 @@ FileDownloaderManager::~FileDownloaderManager() { void FileDownloaderManager::start() { - _downloadFailedConnection = downloadFailed().connect( - DownloadFailedSignal::slot_type([&] (const std::string & peer, FileProvider::FailedDownloadReason reason) { (void) reason; removePeerStatus(peer); }).track_foreign(shared_from_this())); - - _downloadCompletedConnection = downloadCompleted().connect( - DownloadCompletedSignal::slot_type(_setFinishedDownloadingStatus).track_foreign(shared_from_this())); - - _filesToDownloadChangedConnection = _fileDistributionModel->_filesToDownloadChanged.connect( - FileDistributionModel::FilesToDownloadChangedSignal::slot_type(std::ref(_startDownloads)).track_foreign(shared_from_this())); + _downloadFailedConnection = + downloadFailed().connect( + DownloadFailedSignal::slot_type(lambda::bind(&FileDownloaderManager::removePeerStatus, this, lambda::_1)). + track(shared_from_this())); + + _downloadCompletedConnection = + downloadCompleted().connect( + DownloadCompletedSignal::slot_type(_setFinishedDownloadingStatus). + track(shared_from_this())); + + _filesToDownloadChangedConnection = + _fileDistributionModel->_filesToDownloadChanged.connect( + FileDistributionModel::FilesToDownloadChangedSignal::slot_type(boost::ref(_startDownloads)). + track(shared_from_this())); } boost::optional< boost::filesystem::path > @@ -91,6 +99,7 @@ FileDownloaderManager::StartDownloads::downloadFile(const std::string& fileRefer void FileDownloaderManager::StartDownloads::operator()() { + namespace ll = boost::lambda; DirectoryGuard::UP guard = _parent._fileDownloader->getGuard(); LockGuard updateFilesToDownloadGuard(_parent._updateFilesToDownloadMutex); @@ -99,7 +108,7 @@ FileDownloaderManager::StartDownloads::operator()() { logStartDownload(filesToDownload); std::for_each(filesToDownload.begin(), filesToDownload.end(), - [&] (const std::string& file) { downloadFile(file); }); + ll::bind(&StartDownloads::downloadFile, this, ll::_1)); _parent._fileDownloader->removeAllTorrentsBut(filesToDownload); } @@ -126,7 +135,7 @@ FileDownloaderManager::SetFinishedDownloadingStatus::operator()( } catch(const FileDistributionModel::NotPeer&) { //Probably a concurrent removal of the torrent. //improve chance of libtorrent session being updated. - std::this_thread::sleep_for(100ms); + boost::this_thread::sleep(boost::posix_time::milliseconds(100)); if (_parent._fileDownloader->hasTorrent(fileReference)) { _parent._fileDistributionModel->addPeer(fileReference); diff --git a/filedistribution/src/vespa/filedistribution/distributor/filedownloadermanager.h b/filedistribution/src/vespa/filedistribution/distributor/filedownloadermanager.h index 1294f7d7f77..f99888c5a26 100644 --- a/filedistribution/src/vespa/filedistribution/distributor/filedownloadermanager.h +++ b/filedistribution/src/vespa/filedistribution/distributor/filedownloadermanager.h @@ -1,7 +1,9 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once +#include <boost/thread/mutex.hpp> #include <boost/signals2/signal.hpp> +#include <boost/enable_shared_from_this.hpp> #include <vespa/filedistribution/rpc/fileprovider.h> #include <vespa/filedistribution/model/filedistributionmodel.h> @@ -10,7 +12,7 @@ namespace filedistribution { class FileDownloaderManager : public FileProvider, - public std::enable_shared_from_this<FileDownloaderManager> { + public boost::enable_shared_from_this<FileDownloaderManager> { class StartDownloads { FileDownloaderManager& _parent; @@ -27,11 +29,11 @@ class FileDownloaderManager : public FileProvider, SetFinishedDownloadingStatus(FileDownloaderManager*); }; - typedef std::lock_guard<std::mutex> LockGuard; - std::mutex _updateFilesToDownloadMutex; + typedef boost::lock_guard<boost::mutex> LockGuard; + boost::mutex _updateFilesToDownloadMutex; - std::shared_ptr<FileDownloader> _fileDownloader; - std::shared_ptr<FileDistributionModel> _fileDistributionModel; + boost::shared_ptr<FileDownloader> _fileDownloader; + boost::shared_ptr<FileDistributionModel> _fileDistributionModel; StartDownloads _startDownloads; SetFinishedDownloadingStatus _setFinishedDownloadingStatus; @@ -41,11 +43,10 @@ class FileDownloaderManager : public FileProvider, void removePeerStatus(const std::string& fileReference); public: - using SP = std::shared_ptr<FileDownloaderManager>; FileDownloaderManager(const FileDownloaderManager &) = delete; FileDownloaderManager & operator = (const FileDownloaderManager &) = delete; - FileDownloaderManager(const std::shared_ptr<FileDownloader>&, - const std::shared_ptr<FileDistributionModel>& model); + FileDownloaderManager(const boost::shared_ptr<FileDownloader>&, + const boost::shared_ptr<FileDistributionModel>& model); ~FileDownloaderManager(); void start(); diff --git a/filedistribution/src/vespa/filedistribution/distributor/scheduler.cpp b/filedistribution/src/vespa/filedistribution/distributor/scheduler.cpp index 4f75afb4850..1ae0b0b1f95 100644 --- a/filedistribution/src/vespa/filedistribution/distributor/scheduler.cpp +++ b/filedistribution/src/vespa/filedistribution/distributor/scheduler.cpp @@ -1,6 +1,9 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/fastos/fastos.h> #include "scheduler.h" + +#include <boost/bind.hpp> + #include <iostream> namespace asio = boost::asio; @@ -16,8 +19,7 @@ void Task::schedule(asio::deadline_timer::duration_type delay) { _timer.expires_from_now(delay); - std::shared_ptr<Task> self = shared_from_this();; - _timer.async_wait([self](const auto & e) { self->handle(e); }); + _timer.async_wait(boost::bind(&Task::handle, shared_from_this(), _1)); } void @@ -34,13 +36,14 @@ Task::handle(const boost::system::error_code& code) { } -Scheduler::Scheduler(std::function<void (asio::io_service&)> callRun) +Scheduler::Scheduler(boost::function<void (asio::io_service&)> callRun) :_keepAliveWork(ioService), - _workerThread([&, callRun]() { callRun(ioService); }) + _workerThread(boost::bind(callRun, boost::ref(ioService))) {} Scheduler::~Scheduler() { ioService.stop(); + _workerThread.interrupt(); _workerThread.join(); ioService.reset(); } diff --git a/filedistribution/src/vespa/filedistribution/distributor/scheduler.h b/filedistribution/src/vespa/filedistribution/distributor/scheduler.h index c2eead235bf..9492a8977d7 100644 --- a/filedistribution/src/vespa/filedistribution/distributor/scheduler.h +++ b/filedistribution/src/vespa/filedistribution/distributor/scheduler.h @@ -3,17 +3,18 @@ #include <boost/asio/io_service.hpp> #include <boost/asio/deadline_timer.hpp> -#include <thread> +#include <boost/enable_shared_from_this.hpp> +#include <boost/thread.hpp> namespace filedistribution { class Scheduler { public: - class Task : public std::enable_shared_from_this<Task> { + class Task : public boost::enable_shared_from_this<Task> { boost::asio::deadline_timer _timer; public: - typedef std::shared_ptr<Task> SP; + typedef boost::shared_ptr<Task> SP; Task(Scheduler& scheduler); @@ -33,12 +34,12 @@ private: //keeps io_service.run() from exiting until it has been destructed, //see http://www.boost.org/doc/libs/1_42_0/doc/html/boost_asio/reference/io_service.html boost::asio::io_service::work _keepAliveWork; - std::thread _workerThread; + boost::thread _workerThread; public: Scheduler(const Scheduler &) = delete; Scheduler & operator = (const Scheduler &) = delete; - Scheduler(std::function<void (boost::asio::io_service&)> callRun) ; + Scheduler(boost::function<void (boost::asio::io_service&)> callRun) ; ~Scheduler(); }; diff --git a/filedistribution/src/vespa/filedistribution/manager/createtorrent.cpp b/filedistribution/src/vespa/filedistribution/manager/createtorrent.cpp index fd54b65cdbc..001edd0e20a 100644 --- a/filedistribution/src/vespa/filedistribution/manager/createtorrent.cpp +++ b/filedistribution/src/vespa/filedistribution/manager/createtorrent.cpp @@ -10,6 +10,8 @@ #include <string> #include <boost/filesystem/convenience.hpp> +#include <boost/lambda/lambda.hpp> + #include "libtorrent/torrent_info.hpp" namespace fs = boost::filesystem; diff --git a/filedistribution/src/vespa/filedistribution/manager/filedistributionmanager.cpp b/filedistribution/src/vespa/filedistribution/manager/filedistributionmanager.cpp index 057902327a9..fb75d88e031 100644 --- a/filedistribution/src/vespa/filedistribution/manager/filedistributionmanager.cpp +++ b/filedistribution/src/vespa/filedistribution/manager/filedistributionmanager.cpp @@ -3,6 +3,8 @@ #include <vespa/filedistribution/manager/com_yahoo_vespa_filedistribution_FileDistributionManager.h> #include <memory> +#include <boost/lambda/lambda.hpp> + #include <vespa/filedistribution/model/filedistributionmodel.h> #include <vespa/filedistribution/model/zkfiledbmodel.h> #include <vespa/filedistribution/model/mockfiledistributionmodel.h> @@ -87,7 +89,8 @@ void initMockFileDBModel(NativeFileDistributionManager& manager) void initFileDBModel(NativeFileDistributionManager& manager, const std::string& zkServers) { //Ignored for now, since we're not installing any watchers. - std::shared_ptr<ZKFacade> zk(new ZKFacade(zkServers)); + boost::shared_ptr<ExceptionRethrower> ignoredRethrower(new ExceptionRethrower()); + boost::shared_ptr<ZKFacade> zk(new ZKFacade(zkServers, ignoredRethrower)); manager._fileDBModel.reset(new ZKFileDBModel(zk)); } } //end anonymous namespace diff --git a/filedistribution/src/vespa/filedistribution/model/deployedfilestodownload.cpp b/filedistribution/src/vespa/filedistribution/model/deployedfilestodownload.cpp index 1e80ff375a4..733d60d91bb 100644 --- a/filedistribution/src/vespa/filedistribution/model/deployedfilestodownload.cpp +++ b/filedistribution/src/vespa/filedistribution/model/deployedfilestodownload.cpp @@ -5,6 +5,9 @@ #include <sstream> #include <iterator> +#include <boost/lambda/lambda.hpp> +#include <boost/lambda/bind.hpp> + #include <vespa/filedistribution/common/logfwd.h> using filedistribution::DeployedFilesToDownload; @@ -34,7 +37,8 @@ DeployedFilesToDownload::addNewDeployNode(Path parentPath, const FileReferences& std::ostringstream filesStream; if (!files.empty()) { filesStream << files[0]; - std::for_each(files.begin() +1, files.end(), [&](const auto & v) { filesStream << '\n' << v; }); + std::for_each(files.begin() +1, files.end(), + filesStream <<boost::lambda::constant('\n') <<boost::lambda::_1); } Path retPath = _zk.createSequenceNode(path, filesStream.str().c_str(), filesStream.str().length()); return retPath; @@ -70,7 +74,8 @@ DeployedFilesToDownload::deleteExpiredDeployNodes(Path parentPath, StringVector size_t numberOfNodesToDelete = children.size() - numberOfDeploysToKeepFiles; std::for_each(children.begin(), children.begin() + numberOfNodesToDelete, - [&](const std::string & s) {_zk.remove(parentPath / s); }); + boost::lambda::bind(&ZKFacade::remove, &_zk, + boost::lambda::ret<Path>(parentPath / boost::lambda::_1))); } } diff --git a/filedistribution/src/vespa/filedistribution/model/filedistributionmodelimpl.cpp b/filedistribution/src/vespa/filedistribution/model/filedistributionmodelimpl.cpp index adff69cfc6c..5b9de93249a 100644 --- a/filedistribution/src/vespa/filedistribution/model/filedistributionmodelimpl.cpp +++ b/filedistribution/src/vespa/filedistribution/model/filedistributionmodelimpl.cpp @@ -8,6 +8,10 @@ #include <cstdlib> #include <boost/filesystem.hpp> +#include <boost/lambda/lambda.hpp> +#include <boost/lambda/bind.hpp> +#include <boost/thread/mutex.hpp> +#include <boost/thread/locks.hpp> #include <zookeeper/zookeeper.h> #include <vespa/log/log.h> @@ -70,18 +74,18 @@ using filedistribution::FileDistributionModelImpl; struct FileDistributionModelImpl::DeployedFilesChangedCallback : public ZKFacade::NodeChangedWatcher { - typedef std::shared_ptr<DeployedFilesChangedCallback> SP; + typedef boost::shared_ptr<DeployedFilesChangedCallback> SP; - std::weak_ptr<FileDistributionModelImpl> _parent; + boost::weak_ptr<FileDistributionModelImpl> _parent; DeployedFilesChangedCallback( - const std::shared_ptr<FileDistributionModelImpl> & parent) + const boost::shared_ptr<FileDistributionModelImpl> & parent) :_parent(parent) {} //override void operator()() { - if (std::shared_ptr<FileDistributionModelImpl> model = _parent.lock()) { + if (boost::shared_ptr<FileDistributionModelImpl> model = _parent.lock()) { model->_filesToDownloadChanged(); } } @@ -107,7 +111,9 @@ FileDistributionModelImpl::getPeers(const std::string& fileReference, size_t max PeerEntries result; result.reserve(end - peers.begin()); - std::for_each(peers.begin(), end, [&] (const std::string & s) { addPeerEntry(s, result); }); + namespace ll=boost::lambda; + std::for_each(peers.begin(), end, + ll::bind(&addPeerEntry, boost::lambda::_1, boost::ref(result))); LOG(debug, "Found %zu peers for path '%s'", result.size(), path.string().c_str()); return result; @@ -223,8 +229,11 @@ FileDistributionModelImpl::addConfigServersAsPeers( void FileDistributionModelImpl::configure(std::unique_ptr<FilereferencesConfig> config) { - const bool changed = updateActiveFileReferences(config->filereferences); - if (changed) { - _filesToDownloadChanged(); + try { + const bool changed = updateActiveFileReferences(config->filereferences); + if (changed) + _filesToDownloadChanged(); + } catch(...) { + _exceptionRethrower->store(boost::current_exception()); } } diff --git a/filedistribution/src/vespa/filedistribution/model/filedistributionmodelimpl.h b/filedistribution/src/vespa/filedistribution/model/filedistributionmodelimpl.h index 224009822e1..04a111a00df 100644 --- a/filedistribution/src/vespa/filedistribution/model/filedistributionmodelimpl.h +++ b/filedistribution/src/vespa/filedistribution/model/filedistributionmodelimpl.h @@ -1,10 +1,13 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once +#include <boost/enable_shared_from_this.hpp> + #include "filedistributionmodel.h" #include <vespa/filedistribution/model/config-filereferences.h> #include "zkfacade.h" #include "zkfiledbmodel.h" +#include <vespa/filedistribution/common/exceptionrethrower.h> #include <vespa/config/config.h> using cloud::config::filedistribution::FilereferencesConfig; @@ -13,30 +16,35 @@ namespace filedistribution { class FileDistributionModelImpl : public FileDistributionModel, public config::IFetcherCallback<FilereferencesConfig>, - public std::enable_shared_from_this<FileDistributionModelImpl> + public boost::enable_shared_from_this<FileDistributionModelImpl> { struct DeployedFilesChangedCallback; const std::string _hostName; const int _port; - const std::shared_ptr<ZKFacade> _zk; + const boost::shared_ptr<ZKFacade> _zk; ZKFileDBModel _fileDBModel; - std::mutex _activeFileReferencesMutex; - typedef std::lock_guard<std::mutex> LockGuard; + boost::mutex _activeFileReferencesMutex; + typedef boost::lock_guard<boost::mutex> LockGuard; std::vector<vespalib::string> _activeFileReferences; + const boost::shared_ptr<ExceptionRethrower> _exceptionRethrower; + bool /*changed*/ updateActiveFileReferences(const std::vector<vespalib::string>& fileReferences); ZKFacade::Path getPeerEntryPath(const std::string& fileReference); public: - FileDistributionModelImpl(const std::string& hostName, int port, const std::shared_ptr<ZKFacade>& zk) + FileDistributionModelImpl(const std::string& hostName, int port, + const boost::shared_ptr<ZKFacade>& zk, + const boost::shared_ptr<ExceptionRethrower>& exceptionRethrower) :_hostName(hostName), _port(port), _zk(zk), - _fileDBModel(_zk) + _fileDBModel(_zk), + _exceptionRethrower(exceptionRethrower) { /* Hack: Force the first call to updateActiveFileReferences to return changed=true when the file references config is empty. diff --git a/filedistribution/src/vespa/filedistribution/model/zkfacade.cpp b/filedistribution/src/vespa/filedistribution/model/zkfacade.cpp index 7e7caf67ff6..ecfb3ca7b44 100644 --- a/filedistribution/src/vespa/filedistribution/model/zkfacade.cpp +++ b/filedistribution/src/vespa/filedistribution/model/zkfacade.cpp @@ -9,16 +9,17 @@ #include <cassert> #include <cstdio> #include <sstream> -#include <thread> +#include <boost/lambda/lambda.hpp> +#include <boost/lambda/bind.hpp> #include <boost/throw_exception.hpp> #include <boost/function_output_iterator.hpp> +#include <boost/thread.hpp> #include <zookeeper/zookeeper.h> #include <vespa/filedistribution/common/logfwd.h> #include <vespa/defaults.h> -#include <vespa/vespalib/util/sync.h> -typedef std::unique_lock<std::mutex> UniqueLock; +typedef boost::unique_lock<boost::mutex> UniqueLock; using filedistribution::ZKFacade; using filedistribution::Move; @@ -135,11 +136,11 @@ setDataForExistingFile(ZKFacade& zk, const Path& path, const char* buffer, int l /********** Active watchers *******************************************/ struct ZKFacade::ZKWatcher { - const std::weak_ptr<ZKFacade> _owner; + const boost::weak_ptr<ZKFacade> _owner; const NodeChangedWatcherSP _nodeChangedWatcher; ZKWatcher( - const std::shared_ptr<ZKFacade> &owner, + const boost::shared_ptr<ZKFacade> &owner, const NodeChangedWatcherSP& nodeChangedWatcher ) :_owner(owner), _nodeChangedWatcher(nodeChangedWatcher) @@ -169,7 +170,7 @@ struct ZKFacade::ZKWatcher { //this will cause infinite waiting. //To avoid this, a custom shared_ptr deleter using a separate deleter thread must be used. - if (std::shared_ptr<ZKFacade> zk = self->_owner.lock()) { + if (boost::shared_ptr<ZKFacade> zk = self->_owner.lock()) { zk->invokeWatcher(watcherContext); } @@ -180,15 +181,15 @@ struct ZKFacade::ZKWatcher { void ZKFacade::stateWatchingFun(zhandle_t*, int type, int state, const char* path, void* context) { (void)path; - (void)context; //The ZKFacade won't expire before zookeeper_close has finished. + ZKFacade* self = (ZKFacade*)context; if (type == ZOO_SESSION_EVENT) { LOGFWD(debug, "Zookeeper session event: %d", state); if (state == ZOO_EXPIRED_SESSION_STATE) { - throw ZKSessionExpired(); + self->_exceptionRethrower->store(ZKSessionExpired()); } else if (state == ZOO_AUTH_FAILED_STATE) { - throw ZKGenericException(ZNOAUTH); + self->_exceptionRethrower->store(ZKGenericException(ZNOAUTH)); } } else { LOGFWD(info, "State watching function: Unexpected event: '%d' -- '%d' ", type, state); @@ -200,21 +201,21 @@ void* /* watcherContext */ ZKFacade::registerWatcher(const NodeChangedWatcherSP& watcher) { UniqueLock lock(_watchersMutex); - std::shared_ptr<ZKWatcher> zkWatcher(new ZKWatcher(shared_from_this(), watcher)); + boost::shared_ptr<ZKWatcher> zkWatcher(new ZKWatcher(shared_from_this(), watcher)); _watchers[zkWatcher.get()] = zkWatcher; return zkWatcher.get(); } -std::shared_ptr<ZKFacade::ZKWatcher> +boost::shared_ptr<ZKFacade::ZKWatcher> ZKFacade::unregisterWatcher(void* watcherContext) { UniqueLock lock(_watchersMutex); WatchersMap::iterator i = _watchers.find(watcherContext); if (i == _watchers.end()) { - return std::shared_ptr<ZKWatcher>(); + return boost::shared_ptr<ZKWatcher>(); } else { - std::shared_ptr<ZKWatcher> result = i->second; + boost::shared_ptr<ZKWatcher> result = i->second; _watchers.erase(i); return result; } @@ -222,24 +223,30 @@ ZKFacade::unregisterWatcher(void* watcherContext) { void ZKFacade::invokeWatcher(void* watcherContext) { - std::shared_ptr<ZKWatcher> watcher = unregisterWatcher(watcherContext); + try { + boost::shared_ptr<ZKWatcher> watcher = unregisterWatcher(watcherContext); - if (!_watchersEnabled) - return; + if (!_watchersEnabled) + return; - if (watcher) { - (*watcher->_nodeChangedWatcher)(); - } else { - LOGFWD(error, "Invoke called on expired watcher."); + if (watcher) { + (*watcher->_nodeChangedWatcher)(); + } else { + LOGFWD(error, "Invoke called on expired watcher."); + } + } catch(...) { + _exceptionRethrower->store(boost::current_exception()); } } /********** End live watchers ***************************************/ -ZKFacade::ZKFacade(const std::string& zkservers) +ZKFacade::ZKFacade(const std::string& zkservers, + const boost::shared_ptr<ExceptionRethrower> &exceptionRethrower) :_retriesEnabled(true), _watchersEnabled(true), + _exceptionRethrower(exceptionRethrower), _zhandle(zookeeper_init(zkservers.c_str(), &ZKFacade::stateWatchingFun, _zkSessionTimeOut, @@ -255,15 +262,14 @@ ZKFacade::ZKFacade(const std::string& zkservers) ZKFacade::~ZKFacade() { disableRetries(); _watchersEnabled = false; - vespalib::Gate done; - std::thread closer([&done, zhandle=_zhandle] () { zookeeper_close(zhandle); done.countDown(); }); - if ( done.await(50*1000) ) { + + boost::thread shutdownCaller(zookeeper_close, _zhandle); + if (shutdownCaller.timed_join(boost::posix_time::seconds(120))) { LOGFWD(debug, "Zookeeper connection closed successfully."); } else { - LOGFWD(error, "Not able to close down zookeeper. Dumping core so you can figure out what is wrong"); + LOGFWD(info, "Timed out waiting for the zookeeper connection to shut down."); abort(); } - closer.join(); } const std::string @@ -439,9 +445,13 @@ ZKFacade::addEphemeralNode(const Path& path) { void ZKFacade::remove(const Path& path) { + namespace ll = boost::lambda; + std::vector< std::string > children = getChildren(path); if (!children.empty()) { - std::for_each(children.begin(), children.end(), [&](const std::string & s){ remove(path / s); }); + std::for_each(children.begin(), children.end(), + ll::bind(&ZKFacade::remove, this, + ll::ret<Path>(path / ll::_1))); } try { @@ -483,9 +493,12 @@ ZKFacade::retainOnly(const Path& path, const std::vector<std::string>& childrenT Children toPreserveSorted(childrenToPreserve); std::sort(toPreserveSorted.begin(), toPreserveSorted.end()); + namespace ll = boost::lambda; std::set_difference(current.begin(), current.end(), toPreserveSorted.begin(), toPreserveSorted.end(), - boost::make_function_output_iterator([&](const std::string & s){ remove(path / s); })); + boost::make_function_output_iterator( + ll::bind(&ZKFacade::remove, this, + ll::ret<Path>(path / ll::_1)))); } std::vector< std::string > diff --git a/filedistribution/src/vespa/filedistribution/model/zkfacade.h b/filedistribution/src/vespa/filedistribution/model/zkfacade.h index 7631fa6d9dc..e46ed42fdec 100644 --- a/filedistribution/src/vespa/filedistribution/model/zkfacade.h +++ b/filedistribution/src/vespa/filedistribution/model/zkfacade.h @@ -3,12 +3,13 @@ #include <string> #include <vector> -#include <mutex> #include <boost/filesystem/path.hpp> #include <boost/signals2.hpp> +#include <boost/enable_shared_from_this.hpp> #include <vespa/filedistribution/common/buffer.h> #include <vespa/filedistribution/common/exception.h> +#include <vespa/filedistribution/common/exceptionrethrower.h> struct _zhandle; typedef _zhandle zhandle_t; @@ -58,10 +59,11 @@ diagnosticUserLevelMessage(const ZKException& zk); -class ZKFacade : public std::enable_shared_from_this<ZKFacade> { +class ZKFacade : public boost::enable_shared_from_this<ZKFacade> { volatile bool _retriesEnabled; volatile bool _watchersEnabled; + boost::shared_ptr<ExceptionRethrower> _exceptionRethrower; zhandle_t* _zhandle; const static int _zkSessionTimeOut = 30 * 1000; const static size_t _maxDataSize = 1024 * 1024; @@ -69,7 +71,7 @@ class ZKFacade : public std::enable_shared_from_this<ZKFacade> { class ZKWatcher; static void stateWatchingFun(zhandle_t*, int type, int state, const char* path, void* context); public: - typedef std::shared_ptr<ZKFacade> SP; + typedef boost::shared_ptr<ZKFacade> SP; /* Lifetime is managed by ZKFacade. Derived classes should only contain weak_ptrs to other objects @@ -84,12 +86,12 @@ public: virtual void operator()() = 0; }; - typedef std::shared_ptr<NodeChangedWatcher> NodeChangedWatcherSP; + typedef boost::shared_ptr<NodeChangedWatcher> NodeChangedWatcherSP; typedef boost::filesystem::path Path; ZKFacade(const ZKFacade &) = delete; ZKFacade & operator = (const ZKFacade &) = delete; - ZKFacade(const std::string& zkservers); + ZKFacade(const std::string& zkservers, const boost::shared_ptr<ExceptionRethrower> &); ~ZKFacade(); bool hasNode(const Path&); @@ -123,11 +125,11 @@ public: private: void* registerWatcher(const NodeChangedWatcherSP &); //returns watcherContext - std::shared_ptr<ZKWatcher> unregisterWatcher(void* watcherContext); + boost::shared_ptr<ZKWatcher> unregisterWatcher(void* watcherContext); void invokeWatcher(void* watcherContext); - std::mutex _watchersMutex; - typedef std::map<void*, std::shared_ptr<ZKWatcher> > WatchersMap; + boost::mutex _watchersMutex; + typedef std::map<void*, boost::shared_ptr<ZKWatcher> > WatchersMap; WatchersMap _watchers; }; diff --git a/filedistribution/src/vespa/filedistribution/model/zkfiledbmodel.cpp b/filedistribution/src/vespa/filedistribution/model/zkfiledbmodel.cpp index 70827305138..f9a4c777b30 100644 --- a/filedistribution/src/vespa/filedistribution/model/zkfiledbmodel.cpp +++ b/filedistribution/src/vespa/filedistribution/model/zkfiledbmodel.cpp @@ -4,6 +4,9 @@ #include <ostream> #include <algorithm> +#include <boost/lambda/lambda.hpp> +#include <boost/lambda/bind.hpp> +#include <boost/foreach.hpp> #include "zkfacade.h" #include "zkfiledbmodel.h" @@ -204,7 +207,7 @@ ZKFileDBModel::getHostStatus(const std::string& hostName) { hostStatus._numFilesToDownload = filesToDownload.size(); hostStatus._numFilesFinished = 0; - for (const std::string & file : filesToDownload) { + BOOST_FOREACH(std::string file, filesToDownload) { Path path = getPeersPath(file); const PeerEntries peerEntries = getSortedChildren(*_zk, path); @@ -239,7 +242,7 @@ ZKFileDBModel::cleanFiles( _zk->retainOnly(_fileDBPath, filesToPreserve); } -ZKFileDBModel::ZKFileDBModel(const std::shared_ptr<ZKFacade>& zk) +ZKFileDBModel::ZKFileDBModel(const boost::shared_ptr<ZKFacade>& zk) : _zk(zk) { createNode(_root, *_zk); @@ -257,7 +260,8 @@ ZKFileDBModel::getProgress(const Path& path) { else if (buffer.size() == 0) return 0; else { - throw boost::enable_current_exception(InvalidProgressException()) <<errorinfo::Path(path); + throw boost::enable_current_exception(InvalidProgressException()) + <<errorinfo::Path(path); } } catch (ZKNodeDoesNotExistsException& e) { //progress information deleted @@ -277,7 +281,7 @@ ZKFileDBModel::getProgress(const std::string& fileReference, const PeerEntries peerEntries = getSortedChildren(*_zk, path); PeerEntries::const_iterator current = peerEntries.begin(); - for (const std::string& host : hostsSortedAscending) { + BOOST_FOREACH(const std::string& host, hostsSortedAscending) { PeerEntries::const_iterator candidate = std::lower_bound(current, peerEntries.end(), host); diff --git a/filedistribution/src/vespa/filedistribution/model/zkfiledbmodel.h b/filedistribution/src/vespa/filedistribution/model/zkfiledbmodel.h index 4249410c00e..cf180f4c780 100644 --- a/filedistribution/src/vespa/filedistribution/model/zkfiledbmodel.h +++ b/filedistribution/src/vespa/filedistribution/model/zkfiledbmodel.h @@ -1,6 +1,8 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once +#include <boost/shared_ptr.hpp> + #include "filedistributionmodel.h" #include "zkfacade.h" @@ -10,7 +12,7 @@ class ZKFileDBModel : public FileDBModel { public: typedef boost::filesystem::path Path; private: - const std::shared_ptr<ZKFacade> _zk; + const boost::shared_ptr<ZKFacade> _zk; char getProgress(const Path& path); void removeDeployFileNodes(const Path& hostPath, const std::string& appId); void removeLegacyDeployFileNodes(const Path& hostPath); @@ -47,7 +49,7 @@ public: std::vector<std::string> getHosts(); HostStatus getHostStatus(const std::string& hostName); - ZKFileDBModel(const std::shared_ptr<ZKFacade>& zk); + ZKFileDBModel(const boost::shared_ptr<ZKFacade>& zk); Progress getProgress(const std::string& fileReference, const std::vector<std::string>& hostsSortedAscending); diff --git a/filedistribution/src/vespa/filedistribution/rpc/filedistributorrpc.cpp b/filedistribution/src/vespa/filedistribution/rpc/filedistributorrpc.cpp index 6579ea06f31..4e63b90d8b9 100644 --- a/filedistribution/src/vespa/filedistribution/rpc/filedistributorrpc.cpp +++ b/filedistribution/src/vespa/filedistribution/rpc/filedistributorrpc.cpp @@ -3,6 +3,10 @@ #include "filedistributorrpc.h" #include <boost/optional.hpp> +#include <boost/thread/mutex.hpp> +#include <boost/thread/locks.hpp> +#include <boost/lambda/bind.hpp> +#include <boost/foreach.hpp> #include <boost/exception/diagnostic_information.hpp> #include <vespa/log/log.h> @@ -16,12 +20,10 @@ LOG_SETUP(".filedistributorrpc"); #include <vespa/filedistribution/model/filedbmodel.h> using filedistribution::FileDistributorRPC; -using filedistribution::FileProvider; - -namespace fs = boost::filesystem; +namespace ll = boost::lambda; namespace { -typedef std::lock_guard<std::mutex> LockGuard; +typedef boost::lock_guard<boost::mutex> LockGuard; struct RPCErrorCodes { const static uint32_t baseErrorCode = 0x10000; @@ -33,7 +35,7 @@ struct RPCErrorCodes { class QueuedRequests { bool _shuttingDown; - std::mutex _mutex; + boost::mutex _mutex; typedef std::multimap<std::string, FRT_RPCRequest*> Map; Map _queuedRequests; @@ -44,8 +46,7 @@ class QueuedRequests { typedef Map::iterator iterator; std::pair<iterator, iterator> range = _queuedRequests.equal_range(fileReference); - for (iterator it(range.first); it != range.second; it++) { - const Map::value_type & request(*it); + BOOST_FOREACH( const Map::value_type& request, range) { LOG(info, "Returning earlier enqueued request for file reference '%s'.", request.first.c_str()); func(*request.second); request.second->Return(); @@ -69,14 +70,14 @@ class QueuedRequests { }; struct DownloadFailed { - FileProvider::FailedDownloadReason _reason; + filedistribution::FileProvider::FailedDownloadReason _reason; void operator()(FRT_RPCRequest& request) { LOG(info, "Download failed: '%d'", _reason); request.SetError(RPCErrorCodes::baseFileProviderErrorCode + _reason, "Download failed"); } - DownloadFailed(FileProvider::FailedDownloadReason reason) + DownloadFailed(filedistribution::FileProvider::FailedDownloadReason reason) :_reason(reason) {} }; @@ -115,13 +116,15 @@ public: _queuedRequests.erase(candidate); } - void downloadFinished(const std::string& fileReference, const fs::path& path) { + void downloadFinished(const std::string& fileReference, + const boost::filesystem::path& path) { DownloadFinished handler(path.string()); returnAnswer(fileReference, handler); } - void downloadFailed(const std::string& fileReference, FileProvider::FailedDownloadReason reason) { + void downloadFailed(const std::string& fileReference, + filedistribution::FileProvider::FailedDownloadReason reason) { DownloadFailed handler(reason); returnAnswer(fileReference, handler); @@ -131,7 +134,7 @@ public: LockGuard guard(_mutex); _shuttingDown = true; - for (const Map::value_type& request : _queuedRequests) { + BOOST_FOREACH( const Map::value_type& request, _queuedRequests) { LOG(info, "Shutdown: Aborting earlier enqueued request for file reference '%s'.", request.first.c_str()); abort(request.second); } @@ -143,7 +146,7 @@ public: class FileDistributorRPC::Server : public FRT_Invokable { public: - FileProvider::SP _fileProvider; + boost::shared_ptr<FileProvider> _fileProvider; std::unique_ptr<FRT_Supervisor> _supervisor; QueuedRequests _queuedRequests; @@ -156,15 +159,16 @@ class FileDistributorRPC::Server : public FRT_Invokable { Server(const Server &) = delete; Server & operator = (const Server &) = delete; - Server(int listen_port, const FileProvider::SP & provider); - void start(const FileDistributorRPC::SP & parent); + Server(int listen_port, const boost::shared_ptr<FileProvider>& provider); + void start(const boost::shared_ptr<FileDistributorRPC> parent); ~Server(); void waitFor(FRT_RPCRequest*); }; FileDistributorRPC:: -Server::Server(int listen_port, const FileProvider::SP & provider) +Server::Server(int listen_port, + const boost::shared_ptr<filedistribution::FileProvider>& provider) :_fileProvider(provider), _supervisor(new FRT_Supervisor()) { @@ -174,7 +178,8 @@ Server::Server(int listen_port, const FileProvider::SP & provider) } -FileDistributorRPC::Server::~Server() { +FileDistributorRPC:: +Server::~Server() { _queuedRequests.shutdown(); const bool waitForFinished = true; @@ -182,16 +187,16 @@ FileDistributorRPC::Server::~Server() { } void -FileDistributorRPC::Server::start(const FileDistributorRPC::SP & parent) { +FileDistributorRPC::Server::start(const boost::shared_ptr<FileDistributorRPC> parent) { _downloadCompletedConnection = _fileProvider->downloadCompleted().connect(FileProvider::DownloadCompletedSignal::slot_type( - [&] (const std::string &file, const fs::path& path) { _queuedRequests.downloadFinished(file, path); }) - .track_foreign(parent)); + ll::bind(&QueuedRequests::downloadFinished, &_queuedRequests, ll::_1, ll::_2)). + track(parent)); _downloadFailedConnection = _fileProvider->downloadFailed().connect(FileProvider::DownloadFailedSignal::slot_type( - [&] (const std::string& file, FileProvider::FailedDownloadReason reason) { _queuedRequests.downloadFailed(file, reason); }) - .track_foreign(parent)); + ll::bind(&QueuedRequests::downloadFailed, &_queuedRequests, ll::_1, ll::_2)). + track(parent)); } @@ -209,7 +214,8 @@ Server::queueRequest(const std::string& fileReference, FRT_RPCRequest* request) } void -FileDistributorRPC::Server::defineMethods() { +FileDistributorRPC:: +Server::defineMethods() { const bool instant = true; FRT_ReflectionBuilder builder(_supervisor.get()); builder.DefineMethod("waitFor", "s", "s", instant, @@ -217,12 +223,13 @@ FileDistributorRPC::Server::defineMethods() { } void -FileDistributorRPC::Server::waitFor(FRT_RPCRequest* request) { +FileDistributorRPC:: +Server::waitFor(FRT_RPCRequest* request) { try { frtstream::FrtServerStream requestHandler(request); std::string fileReference; requestHandler >> fileReference; - boost::optional<fs::path> path + boost::optional<boost::filesystem::path> path = _fileProvider->getPath(fileReference); if (path) { LOG(debug, "Returning request for file reference '%s'.", fileReference.c_str()); @@ -246,7 +253,7 @@ FileDistributorRPC::Server::waitFor(FRT_RPCRequest* request) { } FileDistributorRPC::FileDistributorRPC(const std::string& connectionSpec, - const FileProvider::SP & provider) + const boost::shared_ptr<filedistribution::FileProvider>& provider) :_server(new Server(get_port(connectionSpec), provider)) {} diff --git a/filedistribution/src/vespa/filedistribution/rpc/filedistributorrpc.h b/filedistribution/src/vespa/filedistribution/rpc/filedistributorrpc.h index 3c780bf5878..8c492ad4b5d 100644 --- a/filedistribution/src/vespa/filedistribution/rpc/filedistributorrpc.h +++ b/filedistribution/src/vespa/filedistribution/rpc/filedistributorrpc.h @@ -8,14 +8,13 @@ namespace filedistribution { -class FileDistributorRPC : public std::enable_shared_from_this<FileDistributorRPC> +class FileDistributorRPC : public boost::enable_shared_from_this<FileDistributorRPC> { class Server; public: - using SP = std::shared_ptr<FileDistributorRPC>; FileDistributorRPC(const FileDistributorRPC &) = delete; FileDistributorRPC & operator = (const FileDistributorRPC &) = delete; - FileDistributorRPC(const std::string& connectSpec, const FileProvider::SP & provider); + FileDistributorRPC(const std::string& connectSpec, const boost::shared_ptr<FileProvider>& provider); void start(); diff --git a/filedistribution/src/vespa/filedistribution/rpc/fileprovider.h b/filedistribution/src/vespa/filedistribution/rpc/fileprovider.h index 4eeeee5e359..a95b50fc0f2 100644 --- a/filedistribution/src/vespa/filedistribution/rpc/fileprovider.h +++ b/filedistribution/src/vespa/filedistribution/rpc/fileprovider.h @@ -10,7 +10,6 @@ namespace filedistribution { class FileProvider { public: - using SP = std::shared_ptr<FileProvider>; typedef boost::signals2::signal<void (const std::string& /* fileReference */, const boost::filesystem::path&)> DownloadCompletedSignal; |