summaryrefslogtreecommitdiffstats
path: root/filedistribution
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2016-09-15 16:45:03 +0200
committerGitHub <noreply@github.com>2016-09-15 16:45:03 +0200
commit75e9061a9f1a716d3bd79a3496d07f0ad7256e46 (patch)
tree07a26b47994dc53e8288c9372c3e8febdade6ad5 /filedistribution
parentbf1333c9d6fe3ebd5f736035857f155131702961 (diff)
Revert "Balder/remove boost noncopyable 2"
Diffstat (limited to 'filedistribution')
-rw-r--r--filedistribution/src/apps/filedistributor/filedistributor.cpp105
-rw-r--r--filedistribution/src/apps/status/status-filedistribution.cpp31
-rw-r--r--filedistribution/src/tests/filedbmodelimpl/test-filedistributionmodelimpl.cpp10
-rw-r--r--filedistribution/src/tests/filedownloader/testfiledownloader.cpp33
-rw-r--r--filedistribution/src/tests/lib/mock-zookeeper.cpp35
-rw-r--r--filedistribution/src/tests/rpc/testfileprovider.cpp8
-rw-r--r--filedistribution/src/tests/scheduler/test-scheduler.cpp18
-rw-r--r--filedistribution/src/tests/status/test-status.cpp2
-rw-r--r--filedistribution/src/tests/zkfacade/test-zkfacade.cpp27
-rw-r--r--filedistribution/src/tests/zkfiledbmodel/test-zkfiledbmodel.cpp13
-rw-r--r--filedistribution/src/vespa/filedistribution/common/componentsdeleter.cpp54
-rw-r--r--filedistribution/src/vespa/filedistribution/common/componentsdeleter.h31
-rw-r--r--filedistribution/src/vespa/filedistribution/common/concurrentqueue.h15
-rw-r--r--filedistribution/src/vespa/filedistribution/common/exceptionrethrower.h47
-rw-r--r--filedistribution/src/vespa/filedistribution/distributor/filedistributortrackerimpl.cpp54
-rw-r--r--filedistribution/src/vespa/filedistribution/distributor/filedistributortrackerimpl.h23
-rw-r--r--filedistribution/src/vespa/filedistribution/distributor/filedownloader.cpp52
-rw-r--r--filedistribution/src/vespa/filedistribution/distributor/filedownloader.h19
-rw-r--r--filedistribution/src/vespa/filedistribution/distributor/filedownloadermanager.cpp39
-rw-r--r--filedistribution/src/vespa/filedistribution/distributor/filedownloadermanager.h17
-rw-r--r--filedistribution/src/vespa/filedistribution/distributor/scheduler.cpp11
-rw-r--r--filedistribution/src/vespa/filedistribution/distributor/scheduler.h11
-rw-r--r--filedistribution/src/vespa/filedistribution/manager/createtorrent.cpp2
-rw-r--r--filedistribution/src/vespa/filedistribution/manager/filedistributionmanager.cpp5
-rw-r--r--filedistribution/src/vespa/filedistribution/model/deployedfilestodownload.cpp9
-rw-r--r--filedistribution/src/vespa/filedistribution/model/filedistributionmodelimpl.cpp25
-rw-r--r--filedistribution/src/vespa/filedistribution/model/filedistributionmodelimpl.h20
-rw-r--r--filedistribution/src/vespa/filedistribution/model/zkfacade.cpp69
-rw-r--r--filedistribution/src/vespa/filedistribution/model/zkfacade.h18
-rw-r--r--filedistribution/src/vespa/filedistribution/model/zkfiledbmodel.cpp12
-rw-r--r--filedistribution/src/vespa/filedistribution/model/zkfiledbmodel.h6
-rw-r--r--filedistribution/src/vespa/filedistribution/rpc/filedistributorrpc.cpp59
-rw-r--r--filedistribution/src/vespa/filedistribution/rpc/filedistributorrpc.h5
-rw-r--r--filedistribution/src/vespa/filedistribution/rpc/fileprovider.h1
34 files changed, 556 insertions, 330 deletions
diff --git a/filedistribution/src/apps/filedistributor/filedistributor.cpp b/filedistribution/src/apps/filedistributor/filedistributor.cpp
index 0c89fb08b6b..62def74ee7a 100644
--- a/filedistribution/src/apps/filedistributor/filedistributor.cpp
+++ b/filedistribution/src/apps/filedistributor/filedistributor.cpp
@@ -5,6 +5,11 @@
#include <cstdlib>
#include <boost/program_options.hpp>
+#include <boost/lambda/bind.hpp>
+#include <boost/lambda/lambda.hpp>
+#include <boost/thread/thread.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/date_time/posix_time/posix_time_types.hpp>
#include <boost/exception/diagnostic_information.hpp>
#include <boost/scope_exit.hpp>
@@ -33,7 +38,7 @@ const char* programName = "filedistributor";
#include <vespa/log/log.h>
LOG_SETUP(programName);
-using namespace std::literals;
+namespace ll = boost::lambda;
using namespace filedistribution;
using cloud::config::ZookeepersConfig;
@@ -48,20 +53,21 @@ class FileDistributor : public config::IFetcherCallback<ZookeepersConfig>,
class Components {
ComponentsDeleter _componentsDeleter;
public:
- const std::shared_ptr<ZKFacade> _zk;
- const std::shared_ptr<FileDistributionModelImpl> _model;
- const std::shared_ptr<FileDistributorTrackerImpl> _tracker;
- const std::shared_ptr<FileDownloader> _downloader;
- const FileDownloaderManager::SP _manager;
- const FileDistributorRPC::SP _rpcHandler;
- const std::shared_ptr<StateServerImpl> _stateServer;
+ const boost::shared_ptr<ZKFacade> _zk;
+ const boost::shared_ptr<FileDistributionModelImpl> _model;
+ const boost::shared_ptr<FileDistributorTrackerImpl> _tracker;
+ const boost::shared_ptr<FileDownloader> _downloader;
+ const boost::shared_ptr<FileDownloaderManager> _manager;
+ const boost::shared_ptr<FileDistributorRPC> _rpcHandler;
+ const boost::shared_ptr<StateServerImpl> _stateServer;
private:
- std::thread _downloaderEventLoopThread;
+ boost::thread _downloaderEventLoopThread;
config::ConfigFetcher _configFetcher;
+
template <class T>
- typename std::shared_ptr<T> track(T* component) {
+ typename boost::shared_ptr<T> track(T* component) {
return _componentsDeleter.track(component);
}
@@ -69,24 +75,29 @@ class FileDistributor : public config::IFetcherCallback<ZookeepersConfig>,
Components(const Components &) = delete;
Components & operator = (const Components &) = delete;
- Components(const config::ConfigUri & configUri,
+ Components(const boost::shared_ptr<ExceptionRethrower>& exceptionRethrower,
+ const config::ConfigUri & configUri,
const ZookeepersConfig& zooKeepersConfig,
const FiledistributorConfig& fileDistributorConfig,
const FiledistributorrpcConfig& rpcConfig)
- :_zk(track(new ZKFacade(zooKeepersConfig.zookeeperserverlist))),
+ :_zk(track(new ZKFacade(zooKeepersConfig.zookeeperserverlist, exceptionRethrower))),
_model(track(new FileDistributionModelImpl(
fileDistributorConfig.hostname,
fileDistributorConfig.torrentport,
- _zk))),
- _tracker(track(new FileDistributorTrackerImpl(_model))),
- _downloader(track(new FileDownloader(_tracker,
- fileDistributorConfig.hostname,
- fileDistributorConfig.torrentport,
- boost::filesystem::path(fileDistributorConfig.filedbpath)))),
+ _zk,
+ exceptionRethrower))),
+ _tracker(track(new FileDistributorTrackerImpl(_model, exceptionRethrower))),
+ _downloader(track(new FileDownloader(
+ _tracker,
+ fileDistributorConfig.hostname,
+ fileDistributorConfig.torrentport,
+ boost::filesystem::path(fileDistributorConfig.filedbpath),
+ exceptionRethrower))),
_manager(track(new FileDownloaderManager(_downloader, _model))),
_rpcHandler(track(new FileDistributorRPC(rpcConfig.connectionspec, _manager))),
_stateServer(track(new StateServerImpl(fileDistributorConfig.stateport))),
- _downloaderEventLoopThread([downloader=_downloader] () { downloader->runEventLoop(); }),
+ _downloaderEventLoopThread(
+ ll::bind(&FileDownloader::runEventLoop, _downloader.get())),
_configFetcher(configUri.getContext())
{
@@ -109,20 +120,21 @@ class FileDistributor : public config::IFetcherCallback<ZookeepersConfig>,
//Do not waste time retrying zookeeper operations when going down.
_zk->disableRetries();
- _downloader->close();
+ _downloaderEventLoopThread.interrupt();
_downloaderEventLoopThread.join();
}
};
- typedef std::lock_guard<std::mutex> LockGuard;
- std::mutex _configMutex;
+ typedef boost::lock_guard<boost::mutex> LockGuard;
+ boost::mutex _configMutex;
bool _completeReconfigurationNeeded;
std::unique_ptr<ZookeepersConfig> _zooKeepersConfig;
std::unique_ptr<FiledistributorConfig> _fileDistributorConfig;
std::unique_ptr<FiledistributorrpcConfig> _rpcConfig;
+ boost::shared_ptr<ExceptionRethrower> _exceptionRethrower;
std::unique_ptr<Components> _components;
public:
FileDistributor(const FileDistributor &) = delete;
@@ -133,6 +145,7 @@ public:
_zooKeepersConfig(),
_fileDistributorConfig(),
_rpcConfig(),
+ _exceptionRethrower(),
_components()
{ }
@@ -174,14 +187,35 @@ public:
void run(const config::ConfigUri & configUri) {
while (!askedToShutDown()) {
clearReinitializeFlag();
+ _exceptionRethrower.reset(new ExceptionRethrower());
runImpl(configUri);
+
+ if (_exceptionRethrower->exceptionStored())
+ _exceptionRethrower->rethrow();
}
}
- void createComponents(const config::ConfigUri & configUri) {
+ static void ensureExceptionsStored(const boost::shared_ptr<ExceptionRethrower>& exceptionRethrower) {
+ //TODO: this is somewhat hackish, refactor to eliminate this later.
+ LOG(debug, "Waiting for shutdown");
+ for (int i=0;
+ i<50 && !exceptionRethrower.unique();
+ ++i) {
+ boost::this_thread::sleep(boost::posix_time::milliseconds(100));
+ }
+ LOG(debug, "Done waiting for shutdown");
+
+ if (!exceptionRethrower.unique()) {
+ EV_STOPPING(programName, "Forced termination");
+ kill(getpid(), SIGKILL);
+ }
+ }
+
+ void createComponents(const boost::shared_ptr<ExceptionRethrower>& exceptionRethrower, const config::ConfigUri & configUri) {
LockGuard guard(_configMutex);
_components.reset(
- new Components(configUri,
+ new Components(exceptionRethrower,
+ configUri,
*_zooKeepersConfig,
*_fileDistributorConfig,
*_rpcConfig));
@@ -204,8 +238,17 @@ public:
downloader.setMaxUploadSpeed(config.maxuploadspeed);
}
+ //avoid warning due to scope exit macro
+#pragma GCC diagnostic ignored "-Wshadow"
void runImpl(const config::ConfigUri & configUri) {
- createComponents(configUri);
+
+ BOOST_SCOPE_EXIT((&_components)(&_exceptionRethrower)) {
+ _components.reset();
+ //Ensures that any exception stored during destruction will be available when returning.
+ ensureExceptionsStored(_exceptionRethrower);
+ } BOOST_SCOPE_EXIT_END
+
+ createComponents(_exceptionRethrower, configUri);
// We do not want back to back reinitializing as it gives zero time for serving
// some torrents.
@@ -213,15 +256,17 @@ public:
while (!askedToShutDown() &&
(postPoneAskedToReinitializedSecs > 0 || !askedToReinitialize()) &&
- !completeReconfigurationNeeded())
- {
- postPoneAskedToReinitializedSecs--;
- std::this_thread::sleep_for(1s);
+ !completeReconfigurationNeeded() &&
+ !_exceptionRethrower->exceptionStored()) {
+ postPoneAskedToReinitializedSecs--;
+ boost::this_thread::sleep(boost::posix_time::seconds(1));
}
- _components.reset();
}
};
+//TODO: use pop in gcc 4.6
+#pragma GCC diagnostic warning "-Wshadow"
+
class FileDistributorApplication : public FastOS_Application {
const config::ConfigUri _configUri;
public:
diff --git a/filedistribution/src/apps/status/status-filedistribution.cpp b/filedistribution/src/apps/status/status-filedistribution.cpp
index d7dc62e29c4..90c21623016 100644
--- a/filedistribution/src/apps/status/status-filedistribution.cpp
+++ b/filedistribution/src/apps/status/status-filedistribution.cpp
@@ -5,18 +5,18 @@ LOG_SETUP("status-filedistribution");
#include <iostream>
#include <map>
-#include <thread>
#include <boost/program_options.hpp>
+#include <boost/foreach.hpp>
+#include <boost/thread.hpp>
+#include <vespa/filedistribution/common/exceptionrethrower.h>
#include <vespa/filedistribution/model/zkfacade.h>
#include <vespa/filedistribution/model/filedistributionmodel.h>
#include <vespa/filedistribution/model/filedistributionmodelimpl.h>
#include <zookeeper/zookeeper.h>
using namespace filedistribution;
-using namespace std::literals;
-namespace po = boost::program_options;
std::string
plural(size_t size)
@@ -42,7 +42,7 @@ void
printWaitingForHosts(const StatusByHostName& notFinishedHosts)
{
std::cout <<"Waiting for the following host" <<plural(notFinishedHosts) <<":" <<std::endl;
- for (const StatusByHostName::value_type & hostNameAndStatus : notFinishedHosts) {
+ BOOST_FOREACH(const StatusByHostName::value_type hostNameAndStatus, notFinishedHosts) {
std::cout <<hostNameAndStatus.first <<" (";
const HostStatus& hostStatus = hostNameAndStatus.second;
@@ -60,9 +60,10 @@ printWaitingForHosts(const StatusByHostName& notFinishedHosts)
//TODO:refactor
int printStatus(const std::string& zkservers)
{
- std::shared_ptr<ZKFacade> zk(new ZKFacade(zkservers));
+ boost::shared_ptr<ExceptionRethrower> exceptionRethrower;
+ boost::shared_ptr<ZKFacade> zk(new ZKFacade(zkservers, exceptionRethrower));
- std::shared_ptr<FileDBModel> model(new ZKFileDBModel(zk));
+ boost::shared_ptr<FileDBModel> model(new ZKFileDBModel(zk));
std::vector<std::string> hosts = model->getHosts();
@@ -70,7 +71,7 @@ int printStatus(const std::string& zkservers)
StatusByHostName finishedHosts;
bool hasStarted = false;
- for (const std::string & host : hosts) {
+ BOOST_FOREACH(std::string host, hosts) {
HostStatus hostStatus = model->getHostStatus(host);
switch (hostStatus._state) {
case HostStatus::finished:
@@ -117,7 +118,7 @@ printStatusRetryIfZKProblem(const std::string& zkservers, const std::string& zkL
} catch (ZKSessionExpired& e) {
LOG(debug, "Session expired.");
}
- std::this_thread::sleep_for(500ms);
+ boost::this_thread::sleep(boost::posix_time::milliseconds(500));
}
return 4;
}
@@ -131,11 +132,12 @@ struct ProgramOptionException {
{}
};
-bool exists(const std::string& optionName, const po::variables_map& map) {
+bool exists(const std::string& optionName, const boost::program_options::variables_map& map) {
return map.find(optionName) != map.end();
}
-void ensureExists(const std::string& optionName, const po::variables_map& map) {
+void ensureExists(const std::string& optionName, const boost::program_options::variables_map& map \
+ ) {
if (!exists(optionName, map)) {
throw ProgramOptionException("Error: Missing option " + optionName);
}
@@ -150,15 +152,18 @@ main(int argc, char** argv) {
*zkLogFile = "zkLogFile",
*help = "help";
- po::options_description description;
+ namespace po = boost::program_options;
+ boost::program_options::options_description description;
description.add_options()
(zkstring, po::value<std::string > (), "The zookeeper servers to connect to, separated by comma")
(zkLogFile, po::value<std::string >() -> default_value("/dev/null"), "Zookeeper log file")
(help, "help");
try {
- po::variables_map values;
- po::store(po::parse_command_line(argc, argv, description), values);
+ boost::program_options::variables_map values;
+ po::store(
+ boost::program_options::parse_command_line(argc, argv, description),
+ values);
if (exists(help, values)) {
std::cout <<description;
diff --git a/filedistribution/src/tests/filedbmodelimpl/test-filedistributionmodelimpl.cpp b/filedistribution/src/tests/filedbmodelimpl/test-filedistributionmodelimpl.cpp
index 3cf12722d86..84d1b5b958c 100644
--- a/filedistribution/src/tests/filedbmodelimpl/test-filedistributionmodelimpl.cpp
+++ b/filedistribution/src/tests/filedbmodelimpl/test-filedistributionmodelimpl.cpp
@@ -20,12 +20,14 @@ namespace {
struct Fixture {
+ boost::shared_ptr<ExceptionRethrower> _exceptionRethrower;
ComponentsDeleter _componentsDeleter;
- std::shared_ptr<ZKFacade> _zk;
- std::shared_ptr<FileDistributionModelImpl> _distModel;
+ boost::shared_ptr<ZKFacade> _zk;
+ boost::shared_ptr<FileDistributionModelImpl> _distModel;
Fixture() {
- _zk = _componentsDeleter.track(new ZKFacade("test1-tonyv:2181"));
- _distModel.reset(new FileDistributionModelImpl("hostname", 12345, _zk));
+ _exceptionRethrower.reset(new ExceptionRethrower());
+ _zk = _componentsDeleter.track(new ZKFacade("test1-tonyv:2181", _exceptionRethrower));
+ _distModel.reset(new FileDistributionModelImpl("hostname", 12345, _zk, _exceptionRethrower));
}
~Fixture() { }
};
diff --git a/filedistribution/src/tests/filedownloader/testfiledownloader.cpp b/filedistribution/src/tests/filedownloader/testfiledownloader.cpp
index c9d93ffc218..94b505f4f9f 100644
--- a/filedistribution/src/tests/filedownloader/testfiledownloader.cpp
+++ b/filedistribution/src/tests/filedownloader/testfiledownloader.cpp
@@ -10,6 +10,8 @@
#include <boost/test/unit_test.hpp>
#include <boost/filesystem.hpp>
+#include <boost/thread.hpp>
+#include <boost/lambda/bind.hpp>
#include <boost/filesystem/fstream.hpp>
#include <libtorrent/session.hpp>
@@ -18,6 +20,7 @@
#include <vespa/filedistribution/manager/createtorrent.h>
#include <vespa/filedistribution/model/filedistributionmodel.h>
+#include <vespa/filedistribution/common/exceptionrethrower.h>
#include <vespa/filedistribution/common/componentsdeleter.h>
namespace fs = boost::filesystem;
@@ -30,14 +33,15 @@ const int uploaderPort = 9113;
const int downloaderPort = 9112;
#if 0
-std::shared_ptr<FileDownloader>
+boost::shared_ptr<FileDownloader>
createDownloader(ComponentsDeleter& deleter,
int port, const fs::path& downloaderPath,
- const std::shared_ptr<FileDistributionModel>& model)
+ const boost::shared_ptr<FileDistributionModel>& model,
+ const boost::shared_ptr<ExceptionRethrower>& exceptionRethrower)
{
- std::shared_ptr<FileDistributorTrackerImpl> tracker(deleter.track(new FileDistributorTrackerImpl(model)));
- std::shared_ptr<FileDownloader> downloader(deleter.track(new FileDownloader(tracker,
- localHost, port, downloaderPath)));
+ boost::shared_ptr<FileDistributorTrackerImpl> tracker(deleter.track(new FileDistributorTrackerImpl(model, exceptionRethrower)));
+ boost::shared_ptr<FileDownloader> downloader(deleter.track(new FileDownloader(tracker,
+ localHost, port, downloaderPath, exceptionRethrower)));
tracker->setDownloader(downloader);
return downloader;
@@ -97,22 +101,27 @@ BOOST_AUTO_TEST_CASE(fileDownloaderTest) {
Buffer buffer(createTorrent.bencode());
ComponentsDeleter deleter;
+ boost::shared_ptr<ExceptionRethrower> exceptionRethrower(new ExceptionRethrower());
- std::shared_ptr<FileDistributionModel> model(deleter.track(new MockFileDistributionModel()));
- std::shared_ptr<FileDownloader> downloader =
- createDownloader(deleter, downloaderPort, downloaderPath, model);
+ boost::shared_ptr<FileDistributionModel> model(deleter.track(new MockFileDistributionModel()));
+ boost::shared_ptr<FileDownloader> downloader =
+ createDownloader(deleter, downloaderPort, downloaderPath, model, exceptionRethrower);
- std::shared_ptr<FileDownloader> uploader =
- createDownloader(deleter, uploaderPort, uploaderPath, model);
+ boost::shared_ptr<FileDownloader> uploader =
+ createDownloader(deleter, uploaderPort, uploaderPath, model, exceptionRethrower);
- std::thread uploaderThread( [uploader] () { uploader->runEventLoop(); });
- std::thread downloaderThread( [downloader] () { downloader->runEventLoop(); });
+ boost::thread uploaderThread(
+ boost::lambda::bind(&FileDownloader::runEventLoop, uploader.get()));
+
+ boost::thread downloaderThread(
+ boost::lambda::bind(&FileDownloader::runEventLoop, downloader.get()));
uploader->addTorrent(fileReference, buffer);
downloader->addTorrent(fileReference, buffer);
sleep(5);
BOOST_CHECK(fs::exists(downloaderPath / fileReference / fileToSend));
+ BOOST_CHECK(!exceptionRethrower->exceptionStored());
uploaderThread.interrupt();
uploaderThread.join();
diff --git a/filedistribution/src/tests/lib/mock-zookeeper.cpp b/filedistribution/src/tests/lib/mock-zookeeper.cpp
index 4d39e41786a..82cd03a268e 100644
--- a/filedistribution/src/tests/lib/mock-zookeeper.cpp
+++ b/filedistribution/src/tests/lib/mock-zookeeper.cpp
@@ -7,8 +7,7 @@
#include <cstring>
#include <vector>
-#include <thread>
-#include <atomic>
+#include <boost/thread.hpp>
#include <boost/lexical_cast.hpp>
#include <iostream>
@@ -56,9 +55,7 @@ struct Node {
void triggerWatches(zhandle_t* zh, const std::string& path);
};
-std::shared_ptr<Node> sharedRoot;
-
-void doNothing() { }
+boost::shared_ptr<Node> sharedRoot;
struct ZHandle {
struct Worker {
@@ -71,12 +68,11 @@ struct ZHandle {
int sequence;
- std::shared_ptr<Node> root;
- std::atomic<bool> _closed;
- std::thread _watchersThread;
+ boost::shared_ptr<Node> root;
+ boost::thread _watchersThread;
vector<string> ephemeralNodes;
- typedef std::function<void (void)> InvokeWatcherFun;
+ typedef boost::function<void (void)> InvokeWatcherFun;
ConcurrentQueue<InvokeWatcherFun> watcherInvocations;
Node& getNode(const string& path);
@@ -87,7 +83,7 @@ struct ZHandle {
ephemeralNodes.push_back(path);
}
- ZHandle() : sequence(0), _closed(false), _watchersThread(Worker(this)) {
+ ZHandle() : sequence(0), _watchersThread(Worker(this)) {
if (!sharedRoot)
sharedRoot.reset(new Node());
@@ -96,21 +92,21 @@ struct ZHandle {
~ZHandle() {
std::for_each(ephemeralNodes.begin(), ephemeralNodes.end(),
- [this] (const string & s) { zoo_delete((zhandle_t*)this, s.c_str(), 0); });
- close();
+ boost::bind(&zoo_delete, (zhandle_t*)this,
+ boost::bind(&string::c_str, _1),
+ 0));
+
+ _watchersThread.interrupt();
_watchersThread.join();
}
- void close() {
- _closed.store(true);
- watcherInvocations.push(std::ref(doNothing));
- }
};
void
ZHandle::Worker::operator()()
{
- while (! zhandle._closed.load()) {
+ while (!boost::this_thread::interruption_requested()) {
InvokeWatcherFun fun = zhandle.watcherInvocations.pop();
+ boost::this_thread::disable_interruption di;
fun();
}
}
@@ -138,7 +134,10 @@ ZHandle::getParent(const string& childPath)
void
Node::triggerWatches(zhandle_t* zh, const std::string& path) {
for (auto i = watchers.begin(); i != watchers.end(); ++i) {
- ((ZHandle*)zh)->watcherInvocations.push([zh, i, path] () { i->first(zh, 0, 0, path.c_str(), i->second); });
+ ((ZHandle*)zh)->watcherInvocations.push(boost::bind(i->first, zh, \
+ /*TODO: type, state*/ 0, 0,
+ boost::bind(&string::c_str, path),
+ i->second));
}
watchers.clear();
}
diff --git a/filedistribution/src/tests/rpc/testfileprovider.cpp b/filedistribution/src/tests/rpc/testfileprovider.cpp
index 6be172d0afd..6881eb96b8a 100644
--- a/filedistribution/src/tests/rpc/testfileprovider.cpp
+++ b/filedistribution/src/tests/rpc/testfileprovider.cpp
@@ -17,8 +17,8 @@ const std::string MockFileProvider::_queueForeverFileReference("queue-forever");
BOOST_AUTO_TEST_CASE(fileDistributionRPCTest) {
const std::string spec("tcp/localhost:9111");
- fd::FileProvider::SP provider(new fd::MockFileProvider());
- fd::FileDistributorRPC::SP fileDistributorRPC(new fd::FileDistributorRPC(spec, provider));
+ boost::shared_ptr<fd::MockFileProvider> provider(new fd::MockFileProvider());
+ boost::shared_ptr<fd::FileDistributorRPC> fileDistributorRPC(new fd::FileDistributorRPC(spec, provider));
fileDistributorRPC->start();
frtstream::FrtClientStream rpc(spec);
@@ -37,8 +37,8 @@ BOOST_AUTO_TEST_CASE(fileDistributionRPCTest) {
//must be run through valgrind
BOOST_AUTO_TEST_CASE(require_that_queued_requests_does_not_leak_memory) {
const std::string spec("tcp/localhost:9111");
- std::shared_ptr<MockFileProvider> provider(new MockFileProvider());
- fd::FileDistributorRPC::SP fileDistributorRPC(new fd::FileDistributorRPC(spec, provider));
+ boost::shared_ptr<MockFileProvider> provider(new MockFileProvider());
+ boost::shared_ptr<fd::FileDistributorRPC> fileDistributorRPC(new fd::FileDistributorRPC(spec, provider));
fileDistributorRPC->start();
FRT_Supervisor supervisor;
diff --git a/filedistribution/src/tests/scheduler/test-scheduler.cpp b/filedistribution/src/tests/scheduler/test-scheduler.cpp
index a9249bbdcae..cc669690a31 100644
--- a/filedistribution/src/tests/scheduler/test-scheduler.cpp
+++ b/filedistribution/src/tests/scheduler/test-scheduler.cpp
@@ -9,10 +9,8 @@
#include <iostream>
#include <boost/thread/barrier.hpp>
-#include <thread>
using filedistribution::Scheduler;
-using namespace std::literals;
namespace asio = boost::asio;
@@ -27,11 +25,13 @@ struct CallRun {
{}
void operator()(asio::io_service& ioService) {
- try {
- //No reset needed after handling exceptions.
- ioService.run();
- } catch(const TestException& e ) {
- _caughtException = true;
+ while (!boost::this_thread::interruption_requested()) {
+ try {
+ //No reset needed after handling exceptions.
+ ioService.run();
+ } catch(const TestException& e ) {
+ _caughtException = true;
+ }
}
}
};
@@ -41,7 +41,7 @@ struct Fixture {
Scheduler scheduler;
Fixture()
- : scheduler(std::ref(callRun))
+ : scheduler(boost::ref(callRun))
{}
};
@@ -101,7 +101,7 @@ BOOST_AUTO_TEST_CASE(require_exception_from_tasks_can_be_caught) {
task->scheduleNow();
for (int i=0; i<200 && !callRun._caughtException; ++i) {
- std::this_thread::sleep_for(100ms);
+ boost::thread::sleep(boost::get_system_time() + boost::posix_time::milliseconds(100));
}
BOOST_CHECK(callRun._caughtException);
diff --git a/filedistribution/src/tests/status/test-status.cpp b/filedistribution/src/tests/status/test-status.cpp
index 4fbda2cb9c3..7021752f316 100644
--- a/filedistribution/src/tests/status/test-status.cpp
+++ b/filedistribution/src/tests/status/test-status.cpp
@@ -3,7 +3,9 @@
#define BOOST_TEST_MAIN
#include <vespa/fastos/fastos.h>
#include <boost/test/unit_test.hpp>
+#include <boost/foreach.hpp>
+#include <vespa/filedistribution/common/exceptionrethrower.h>
#include <vespa/filedistribution/model/zkfacade.h>
#include <vespa/filedistribution/model/filedistributionmodel.h>
#include <vespa/filedistribution/model/filedistributionmodelimpl.h>
diff --git a/filedistribution/src/tests/zkfacade/test-zkfacade.cpp b/filedistribution/src/tests/zkfacade/test-zkfacade.cpp
index ada601742db..d45e5059a53 100644
--- a/filedistribution/src/tests/zkfacade/test-zkfacade.cpp
+++ b/filedistribution/src/tests/zkfacade/test-zkfacade.cpp
@@ -8,6 +8,7 @@
#include <iostream>
#include <boost/thread/barrier.hpp>
+#include <boost/thread/thread.hpp>
#include <boost/checked_delete.hpp>
#include <vespa/filedistribution/common/componentsdeleter.h>
@@ -16,7 +17,7 @@
#include <zookeeper/zookeeper.h>
-using namespace std::literals;
+
using namespace filedistribution;
namespace {
@@ -34,13 +35,16 @@ struct Watcher : public ZKFacade::NodeChangedWatcher {
};
struct Fixture {
+ boost::shared_ptr<ExceptionRethrower> _exceptionRethrower;
ComponentsDeleter _componentsDeleter;
- std::shared_ptr<ZKFacade> zk;
+ boost::shared_ptr<ZKFacade> zk;
ZKFacade::Path testNode;
Fixture() {
+ _exceptionRethrower.reset(new ExceptionRethrower());
+
zoo_set_debug_level(ZOO_LOG_LEVEL_WARN);
- zk = _componentsDeleter.track(new ZKFacade("test1-tonyv:2181"));
+ zk = _componentsDeleter.track(new ZKFacade("test1-tonyv:2181", _exceptionRethrower));
testNode = "/test-node";
zk->removeIfExists(testNode);
@@ -70,7 +74,7 @@ BOOST_AUTO_TEST_CASE(hasNode)
BOOST_AUTO_TEST_CASE(hasNodeNotification)
{
- std::shared_ptr<Watcher> watcher(new Watcher);
+ boost::shared_ptr<Watcher> watcher(new Watcher);
zk->hasNode(testNode, watcher);
zk->setData(testNode, "", 0);
@@ -78,7 +82,7 @@ BOOST_AUTO_TEST_CASE(hasNodeNotification)
//after the notification has returned, the watcher must no longer reside in watchers map.
for (int i=0; i<20 && !watcher.unique(); ++i) {
- std::this_thread::sleep_for(100ms);
+ boost::thread::sleep(boost::get_system_time() + boost::posix_time::milliseconds(100));
}
BOOST_CHECK(watcher.unique());
}
@@ -152,7 +156,8 @@ BOOST_AUTO_TEST_CASE(addEphemeralNode)
zk->removeIfExists(ephemeralNode);
//Checked deleter is ok here since we're not installing any watchers
- ZKFacade::SP zk2(new ZKFacade("test1-tonyv:2181"), boost::checked_deleter<ZKFacade>());
+ ZKFacade::SP zk2(new ZKFacade("test1-tonyv:2181", _exceptionRethrower),
+ boost::checked_deleter<ZKFacade>());
zk2->addEphemeralNode(ephemeralNode);
BOOST_CHECK(zk->hasNode(ephemeralNode));
@@ -164,7 +169,7 @@ BOOST_AUTO_TEST_CASE(addEphemeralNode)
BOOST_AUTO_TEST_CASE(dataChangedNotification)
{
- std::shared_ptr<Watcher> watcher(new Watcher);
+ boost::shared_ptr<Watcher> watcher(new Watcher);
zk->setData(testNode, "", 0);
Buffer buffer(zk->getData(testNode, watcher));
@@ -177,7 +182,7 @@ BOOST_AUTO_TEST_CASE(dataChangedNotification)
BOOST_AUTO_TEST_CASE(getChildrenNotification)
{
- std::shared_ptr<Watcher> watcher(new Watcher);
+ boost::shared_ptr<Watcher> watcher(new Watcher);
zk->setData(testNode, "", 0);
zk->getChildren(testNode, watcher);
@@ -189,9 +194,9 @@ BOOST_AUTO_TEST_CASE(getChildrenNotification)
BOOST_AUTO_TEST_CASE(require_that_zkfacade_can_be_deleted_from_callback)
{
struct DeleteZKFacadeWatcher : public Watcher {
- std::shared_ptr<ZKFacade> _zk;
+ boost::shared_ptr<ZKFacade> _zk;
- DeleteZKFacadeWatcher(const std::shared_ptr<ZKFacade>& zk)
+ DeleteZKFacadeWatcher(const boost::shared_ptr<ZKFacade>& zk)
:_zk(zk)
{}
@@ -202,7 +207,7 @@ BOOST_AUTO_TEST_CASE(require_that_zkfacade_can_be_deleted_from_callback)
}
};
- std::shared_ptr<Watcher> watcher((Watcher*)new DeleteZKFacadeWatcher(zk));
+ boost::shared_ptr<Watcher> watcher((Watcher*)new DeleteZKFacadeWatcher(zk));
zk->setData(testNode, "", 0);
zk->getData(testNode, watcher);
diff --git a/filedistribution/src/tests/zkfiledbmodel/test-zkfiledbmodel.cpp b/filedistribution/src/tests/zkfiledbmodel/test-zkfiledbmodel.cpp
index 6a3a87aac96..b385949bb98 100644
--- a/filedistribution/src/tests/zkfiledbmodel/test-zkfiledbmodel.cpp
+++ b/filedistribution/src/tests/zkfiledbmodel/test-zkfiledbmodel.cpp
@@ -7,6 +7,10 @@
#include <iostream>
+#include <boost/thread/barrier.hpp>
+#include <boost/thread/thread.hpp>
+#include <boost/checked_delete.hpp>
+
#include <vespa/filedistribution/common/componentsdeleter.h>
#include <vespa/filedistribution/model/zkfacade.h>
#include <vespa/filedistribution/model/zkfiledbmodel.h>
@@ -22,13 +26,16 @@ namespace {
struct Fixture {
+ boost::shared_ptr<ExceptionRethrower> _exceptionRethrower;
ComponentsDeleter _componentsDeleter;
- std::shared_ptr<ZKFacade> zk;
- std::shared_ptr<ZKFileDBModel> model;
+ boost::shared_ptr<ZKFacade> zk;
+ boost::shared_ptr<ZKFileDBModel> model;
Fixture() {
+ _exceptionRethrower.reset(new ExceptionRethrower());
+
zoo_set_debug_level(ZOO_LOG_LEVEL_WARN);
- zk = _componentsDeleter.track(new ZKFacade("test1-tonyv:2181"));
+ zk = _componentsDeleter.track(new ZKFacade("test1-tonyv:2181", _exceptionRethrower));
zk->setData("/vespa", "", 0);
model = _componentsDeleter.track(new ZKFileDBModel(zk));
diff --git a/filedistribution/src/vespa/filedistribution/common/componentsdeleter.cpp b/filedistribution/src/vespa/filedistribution/common/componentsdeleter.cpp
index 3c6a265941a..74b36b77a72 100644
--- a/filedistribution/src/vespa/filedistribution/common/componentsdeleter.cpp
+++ b/filedistribution/src/vespa/filedistribution/common/componentsdeleter.cpp
@@ -5,7 +5,9 @@
#include <vespa/log/log.h>
LOG_SETUP(".componentsdeleter");
-using namespace std::literals;
+#include <boost/foreach.hpp>
+
+
using namespace filedistribution;
struct ComponentsDeleter::Worker {
@@ -21,21 +23,25 @@ struct ComponentsDeleter::Worker {
void
ComponentsDeleter::Worker::operator()()
{
- while ( ! _parent.areWeDone() ) {
- CallDeleteFun deleteFun = _parent._deleteRequests.pop();
- deleteFun();
+ while (!boost::this_thread::interruption_requested()) {
+ try {
+ CallDeleteFun deleteFun = _parent._deleteRequests.pop();
+ boost::this_thread::disable_interruption di;
+ deleteFun();
+ } catch(const std::exception& e) {
+ LOG(error, e.what());
+ }
}
}
-ComponentsDeleter::ComponentsDeleter() :
- _closed(false),
- _deleterThread(Worker(this))
+ComponentsDeleter::ComponentsDeleter()
+ :_deleterThread(Worker(this))
{}
ComponentsDeleter::~ComponentsDeleter()
{
- close();
waitForAllComponentsDeleted();
+ _deleterThread.interrupt();
_deleterThread.join();
}
@@ -44,29 +50,31 @@ ComponentsDeleter::waitForAllComponentsDeleted()
{
LOG(debug, "Waiting for all components to be deleted");
- for (int i=0; i<600 && !areWeDone(); ++i) {
- std::this_thread::sleep_for(100ms);
+ for (int i=0; i<600 && !allComponentsDeleted(); ++i) {
+ boost::this_thread::sleep(boost::posix_time::milliseconds(100));
}
LOG(debug, "Done waiting for all components to be deleted");
- assert(_trackedComponents.empty());
- assert(_deleteRequests.empty());
+
+ logNotDeletedComponents();
+
+ if (!allComponentsDeleted())
+ kill(getpid(), SIGKILL);
}
-
-void
-ComponentsDeleter::close()
+
+bool
+ComponentsDeleter::allComponentsDeleted()
{
- {
- LockGuard guard(_trackedComponentsMutex);
- _closed = true;
- }
- _deleteRequests.push([]() { LOG(debug, "I am the last one, hurry up and shutdown"); });
+ LockGuard guard(_trackedComponentsMutex);
+ return _trackedComponents.empty();
}
-bool
-ComponentsDeleter::areWeDone()
+void
+ComponentsDeleter::logNotDeletedComponents()
{
LockGuard guard(_trackedComponentsMutex);
- return _closed && _trackedComponents.empty() && _deleteRequests.empty();
+ BOOST_FOREACH(TrackedComponentsMap::value_type component, _trackedComponents) {
+ LOG(info, "Timed out waiting for component '%s' to be deleted", component.second.c_str());
+ }
}
void
diff --git a/filedistribution/src/vespa/filedistribution/common/componentsdeleter.h b/filedistribution/src/vespa/filedistribution/common/componentsdeleter.h
index 46d79663536..4238f88a05e 100644
--- a/filedistribution/src/vespa/filedistribution/common/componentsdeleter.h
+++ b/filedistribution/src/vespa/filedistribution/common/componentsdeleter.h
@@ -4,8 +4,12 @@
#include <map>
#include <typeinfo>
#include <string>
-#include <mutex>
-#include <thread>
+
+#include <boost/function.hpp>
+#include <boost/bind.hpp>
+#include <boost/checked_delete.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/thread.hpp>
#include "concurrentqueue.h"
@@ -18,16 +22,16 @@ namespace filedistribution {
*/
class ComponentsDeleter {
class Worker;
- typedef std::lock_guard<std::mutex> LockGuard;
+ typedef boost::lock_guard<boost::mutex> LockGuard;
- std::mutex _trackedComponentsMutex;
+ boost::mutex _trackedComponentsMutex;
typedef std::map<void*, std::string> TrackedComponentsMap;
TrackedComponentsMap _trackedComponents;
- typedef std::function<void (void)> CallDeleteFun;
+ typedef boost::function<void (void)> CallDeleteFun;
ConcurrentQueue<CallDeleteFun> _deleteRequests;
- bool _closed;
- std::thread _deleterThread;
+
+ boost::thread _deleterThread;
void removeFromTrackedComponents(void* component);
@@ -39,12 +43,12 @@ class ComponentsDeleter {
template <class T>
void requestDelete(T* component) {
- _deleteRequests.push([this, component]() { deleteComponent<T>(component); });
+ _deleteRequests.push(boost::bind(&ComponentsDeleter::deleteComponent<T>, this, component));
}
void waitForAllComponentsDeleted();
- bool areWeDone();
- void close();
+ bool allComponentsDeleted();
+ void logNotDeletedComponents();
public:
ComponentsDeleter(const ComponentsDeleter &) = delete;
ComponentsDeleter & operator = (const ComponentsDeleter &) = delete;
@@ -57,14 +61,11 @@ class ComponentsDeleter {
~ComponentsDeleter();
template <class T>
- std::shared_ptr<T> track(T* t) {
+ boost::shared_ptr<T> track(T* t) {
LockGuard guard(_trackedComponentsMutex);
- if (_closed) {
- return std::shared_ptr<T>(t);
- }
_trackedComponents[t] = typeid(t).name();
- return std::shared_ptr<T>(t, [this](T * p) { requestDelete<T>(p); });
+ return boost::shared_ptr<T>(t, boost::bind(&ComponentsDeleter::requestDelete<T>, this, t));
}
};
}
diff --git a/filedistribution/src/vespa/filedistribution/common/concurrentqueue.h b/filedistribution/src/vespa/filedistribution/common/concurrentqueue.h
index 21b8ade0ab0..056ba3153a2 100644
--- a/filedistribution/src/vespa/filedistribution/common/concurrentqueue.h
+++ b/filedistribution/src/vespa/filedistribution/common/concurrentqueue.h
@@ -3,8 +3,9 @@
#include <queue>
-#include <mutex>
-#include <condition_variable>
+#include <boost/thread/condition_variable.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/thread/locks.hpp>
namespace filedistribution {
@@ -13,10 +14,10 @@ class ConcurrentQueue {
public:
typedef T value_type;
private:
- std::condition_variable _nonEmpty;
+ boost::condition_variable _nonEmpty;
- mutable std::mutex _queueMutex;
- typedef std::unique_lock<std::mutex> UniqueLock;
+ mutable boost::mutex _queueMutex;
+ typedef boost::unique_lock<boost::mutex> UniqueLock;
std::queue<value_type> _queue;
@@ -46,10 +47,6 @@ public:
_queue.pop();
}
}
- bool empty() {
- UniqueLock guard(_queueMutex);
- return _queue.empty();
- }
};
} //namespace filedistribution
diff --git a/filedistribution/src/vespa/filedistribution/common/exceptionrethrower.h b/filedistribution/src/vespa/filedistribution/common/exceptionrethrower.h
new file mode 100644
index 00000000000..28b45546a64
--- /dev/null
+++ b/filedistribution/src/vespa/filedistribution/common/exceptionrethrower.h
@@ -0,0 +1,47 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <boost/thread/mutex.hpp>
+#include <boost/thread/locks.hpp>
+#include <boost/exception_ptr.hpp>
+#include <boost/type_traits/is_polymorphic.hpp>
+
+namespace filedistribution {
+
+//used for rethrowing an exceptions in a different context
+class ExceptionRethrower {
+ boost::exception_ptr _exceptionPtr; //not a pod, default constructed to null value
+
+ mutable boost::mutex _exceptionMutex;
+ typedef boost::lock_guard<boost::mutex> LockGuard;
+
+public:
+ void rethrow() const {
+ LockGuard guard(_exceptionMutex);
+
+ if (_exceptionPtr)
+ boost::rethrow_exception(_exceptionPtr);
+ }
+
+ bool exceptionStored() const {
+ LockGuard guard(_exceptionMutex);
+ return _exceptionPtr;
+ }
+
+ template <class T>
+ void store(const T& exception) {
+ boost::exception_ptr exceptionPtr = boost::copy_exception(exception);
+ store(exceptionPtr);
+ }
+
+ void store(const boost::exception_ptr exceptionPtr) {
+ LockGuard guard(_exceptionMutex);
+
+ if (!_exceptionPtr) //only store the first exception to be rethrowed.
+ _exceptionPtr = exceptionPtr;
+ }
+};
+
+} //namespace filedistribution
+
+
diff --git a/filedistribution/src/vespa/filedistribution/distributor/filedistributortrackerimpl.cpp b/filedistribution/src/vespa/filedistribution/distributor/filedistributortrackerimpl.cpp
index 7eb0ab957ff..055a72e26b3 100644
--- a/filedistribution/src/vespa/filedistribution/distributor/filedistributortrackerimpl.cpp
+++ b/filedistribution/src/vespa/filedistribution/distributor/filedistributortrackerimpl.cpp
@@ -15,7 +15,7 @@ using filedistribution::FileDistributorTrackerImpl;
using filedistribution::FileDownloader;
using filedistribution::FileDistributionModel;
using filedistribution::Scheduler;
-using filedistribution::TorrentSP;
+using filedistribution::ExceptionRethrower;
typedef FileDistributionModel::PeerEntries PeerEntries;
@@ -60,14 +60,14 @@ struct TrackingTask : public Scheduler::Task {
libtorrent::tracker_request _trackerRequest;
boost::weak_ptr<libtorrent::torrent> _torrent;
- std::weak_ptr<FileDownloader> _downloader;
- std::shared_ptr<FileDistributionModel> _model;
+ boost::weak_ptr<FileDownloader> _downloader;
+ boost::shared_ptr<FileDistributionModel> _model;
TrackingTask(Scheduler& scheduler,
const libtorrent::tracker_request& trackerRequest,
- const TorrentSP & torrent,
- const std::weak_ptr<FileDownloader>& downloader,
- const std::shared_ptr<FileDistributionModel>& model)
+ const boost::shared_ptr<libtorrent::torrent>& torrent,
+ const boost::weak_ptr<FileDownloader>& downloader,
+ const boost::shared_ptr<FileDistributionModel>& model)
: Task(scheduler),
_numTimesRescheduled(0),
_trackerRequest(trackerRequest),
@@ -78,12 +78,12 @@ struct TrackingTask : public Scheduler::Task {
//TODO: refactor
void doHandle() {
- if (std::shared_ptr<FileDownloader> downloader = _downloader.lock()) {
+ if (boost::shared_ptr<FileDownloader> downloader = _downloader.lock()) {
//All torrents must be destructed before the session is destructed.
//It's okay to prevent the torrent from expiring here
//since the session can't be destructed while
//we hold a shared_ptr to the downloader.
- if (TorrentSP torrent = _torrent.lock()) {
+ if (boost::shared_ptr<libtorrent::torrent> torrent = _torrent.lock()) {
PeerEntries peers = getPeers(downloader);
if (!peers.empty()) {
@@ -108,7 +108,7 @@ struct TrackingTask : public Scheduler::Task {
}
}
- PeerEntries getPeers(const std::shared_ptr<FileDownloader>& downloader) {
+ PeerEntries getPeers(const boost::shared_ptr<FileDownloader>& downloader) {
std::string fileReference = downloader->infoHash2FileReference(_trackerRequest.info_hash);
const size_t recommendedMaxNumberOfPeers = 30;
@@ -134,12 +134,34 @@ struct TrackingTask : public Scheduler::Task {
}
};
+
+void
+workerFunction(boost::shared_ptr<ExceptionRethrower> exceptionRethrower, asio::io_service& ioService)
+{
+ while (!boost::this_thread::interruption_requested()) {
+ try {
+ //No reset needed after handling exceptions.
+ ioService.run();
+ } catch(const boost::thread_interrupted&) {
+ LOG(debug, "Tracker worker thread interrupted.");
+ throw;
+ } catch(...) {
+ exceptionRethrower->store(boost::current_exception());
+ }
+ }
+}
+
} //anonymous namespace
-FileDistributorTrackerImpl::FileDistributorTrackerImpl(const std::shared_ptr<FileDistributionModel>& model) :
+
+FileDistributorTrackerImpl::FileDistributorTrackerImpl(
+ const boost::shared_ptr<FileDistributionModel>& model,
+ const boost::shared_ptr<ExceptionRethrower>& exceptionRethrower)
+ :_exceptionRethrower(exceptionRethrower),
_model(model)
{}
+
FileDistributorTrackerImpl::~FileDistributorTrackerImpl() {
LOG(debug, "Deconstructing FileDistributorTrackerImpl");
@@ -147,23 +169,25 @@ FileDistributorTrackerImpl::~FileDistributorTrackerImpl() {
_scheduler.reset();
}
+
void
FileDistributorTrackerImpl::trackingRequest(
libtorrent::tracker_request& request,
- const TorrentSP & torrent)
+ const boost::shared_ptr<libtorrent::torrent> & torrent)
{
LockGuard guard(_mutex);
- if (torrent != TorrentSP()) {
- std::shared_ptr<TrackingTask> trackingTask(new TrackingTask(
+ if (torrent != boost::shared_ptr<libtorrent::torrent>()) {
+ boost::shared_ptr<TrackingTask> trackingTask(new TrackingTask(
*_scheduler.get(), request, torrent, _downloader, _model));
trackingTask->scheduleNow();
}
}
+
void
-FileDistributorTrackerImpl::setDownloader(const std::shared_ptr<FileDownloader>& downloader)
+FileDistributorTrackerImpl::setDownloader(const boost::shared_ptr<FileDownloader>& downloader)
{
LockGuard guard(_mutex);
@@ -171,6 +195,6 @@ FileDistributorTrackerImpl::setDownloader(const std::shared_ptr<FileDownloader>&
_downloader = downloader;
if (downloader) {
- _scheduler.reset(new Scheduler([] (asio::io_service& ioService) { ioService.run(); }));
+ _scheduler.reset(new Scheduler(boost::bind(&workerFunction, _exceptionRethrower, _1)));
}
}
diff --git a/filedistribution/src/vespa/filedistribution/distributor/filedistributortrackerimpl.h b/filedistribution/src/vespa/filedistribution/distributor/filedistributortrackerimpl.h
index bf72a2b80df..edbdb9b8943 100644
--- a/filedistribution/src/vespa/filedistribution/distributor/filedistributortrackerimpl.h
+++ b/filedistribution/src/vespa/filedistribution/distributor/filedistributortrackerimpl.h
@@ -4,38 +4,41 @@
#include <libtorrent/session.hpp>
#include <libtorrent/torrent.hpp>
+#include <boost/thread.hpp>
+#include <boost/shared_ptr.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/asio/deadline_timer.hpp>
#include <vespa/filedistribution/model/filedistributionmodel.h>
+#include <vespa/filedistribution/common/exceptionrethrower.h>
#include "scheduler.h"
-#include <mutex>
namespace filedistribution {
class FileDistributionModel;
class FileDownloader;
-using TorrentSP = boost::shared_ptr<libtorrent::torrent>;
-
class FileDistributorTrackerImpl : public FileDistributionTracker {
- const std::shared_ptr<FileDistributionModel> _model;
+ const boost::shared_ptr<ExceptionRethrower> _exceptionRethrower;
+ const boost::shared_ptr<FileDistributionModel> _model;
- typedef std::lock_guard<std::mutex> LockGuard;
- std::mutex _mutex;
- std::weak_ptr<FileDownloader> _downloader;
+ typedef boost::lock_guard<boost::mutex> LockGuard;
+ boost::mutex _mutex;
+ boost::weak_ptr<FileDownloader> _downloader;
//Use separate worker thread to avoid potential deadlock
//between tracker requests and files to download changed requests.
boost::scoped_ptr<Scheduler> _scheduler;
public:
- FileDistributorTrackerImpl(const std::shared_ptr<FileDistributionModel>& model);
+ FileDistributorTrackerImpl(const boost::shared_ptr<FileDistributionModel>& model,
+ const boost::shared_ptr<ExceptionRethrower>& exceptionRethrower);
virtual ~FileDistributorTrackerImpl();
//overrides
- void trackingRequest(libtorrent::tracker_request& request, const TorrentSP & torrent);
+ void trackingRequest(libtorrent::tracker_request& request,
+ const boost::shared_ptr<libtorrent::torrent> & torrent);
- void setDownloader(const std::shared_ptr<FileDownloader>& downloader);
+ void setDownloader(const boost::shared_ptr<FileDownloader>& downloader);
};
} //namespace filedistribution
diff --git a/filedistribution/src/vespa/filedistribution/distributor/filedownloader.cpp b/filedistribution/src/vespa/filedistribution/distributor/filedownloader.cpp
index 7d5d7acceb2..546ae8028f8 100644
--- a/filedistribution/src/vespa/filedistribution/distributor/filedownloader.cpp
+++ b/filedistribution/src/vespa/filedistribution/distributor/filedownloader.cpp
@@ -9,7 +9,11 @@
#include <boost/filesystem.hpp>
#include <boost/filesystem/fstream.hpp>
#include <boost/filesystem/convenience.hpp>
+#include <boost/lambda/lambda.hpp>
+#include <boost/lambda/bind.hpp>
#include <boost/function_output_iterator.hpp>
+#include <boost/foreach.hpp>
+#include <boost/thread.hpp>
#include <libtorrent/alert.hpp>
#include <libtorrent/alert_types.hpp>
@@ -172,9 +176,11 @@ struct FileDownloader::EventHandler
void operator()(const libtorrent::save_resume_data_alert& alert) const {
defaultHandler(alert);
- fs::ofstream resumeFile(resumeDataPathTemp(alert.handle), std::ios_base::binary);
+ fs::ofstream resumeFile(resumeDataPathTemp(alert.handle),
+ std::ios_base::binary);
resumeFile.unsetf(std::ios_base::skipws);
- libtorrent::bencode(std::ostream_iterator<char>(resumeFile), *alert.resume_data);
+ libtorrent::bencode(std::ostream_iterator<char>(resumeFile),
+ *alert.resume_data);
resumeFile.close();
fs::rename(resumeDataPathTemp(alert.handle), resumeDataPath(alert.handle));
_fileDownloader.didReceiveSRD();
@@ -203,14 +209,15 @@ FileDownloader::LogSessionDeconstructed::~LogSessionDeconstructed()
LOG(debug, "Libtorrent session closed successfully.");
}
-FileDownloader::FileDownloader(const std::shared_ptr<FileDistributionTracker>& tracker,
+FileDownloader::FileDownloader(const boost::shared_ptr<FileDistributionTracker>& tracker,
const std::string& hostName, int port,
- const fs::path& dbPath)
+ const fs::path& dbPath,
+ const boost::shared_ptr<ExceptionRethrower>& exceptionRethrower)
: _outstanding_SRD_requests(0),
_tracker(tracker),
_session(tracker.get(), libtorrent::fingerprint("vp", 0, 0, 0, 0), 0),
- _closed(false),
_dbPath(dbPath),
+ _exceptionRethrower(exceptionRethrower),
_hostName(hostName),
_port(port)
{
@@ -351,17 +358,20 @@ FileDownloader::removeAllTorrentsBut(const std::set<std::string> & filesToRetain
LockGuard guard(_modifyTorrentsDownloadingMutex);
std::set<std::string> currentFiles;
+ namespace ll = boost::lambda;
+
std::set<sha1_hash> infoHashesToRetain;
- for (const std::string& fileReference : filesToRetain) {
+ BOOST_FOREACH(const std::string& fileReference, filesToRetain) {
infoHashesToRetain.insert(toInfoHash(fileReference));
}
std::vector<torrent_handle> torrents = _session.get_torrents();
- for (torrent_handle torrent : torrents) {
+ BOOST_FOREACH(torrent_handle torrent, torrents) {
if (!infoHashesToRetain.count(torrent.info_hash())) {
LOG(info, "Removing torrent: '%s' with file reference '%s'",
- getMainName(torrent).c_str(), fileReferenceToString(torrent.info_hash()).c_str());
+ getMainName(torrent).c_str(),
+ fileReferenceToString(torrent.info_hash()).c_str());
deleteTorrentData(torrent, guard);
_session.remove_torrent(torrent);
@@ -372,26 +382,20 @@ FileDownloader::removeAllTorrentsBut(const std::set<std::string> & filesToRetain
void FileDownloader::runEventLoop() {
EventHandler eventHandler(this);
- while ( ! closed() ) {
- if (_session.wait_for_alert(libtorrent::milliseconds(100))) {
- std::unique_ptr<libtorrent::alert> alert = _session.pop_alert();
- eventHandler.handle(std::move(alert));
+ try {
+ while (!boost::this_thread::interruption_requested()) {
+ if (_session.wait_for_alert(libtorrent::milliseconds(100))) {
+ std::unique_ptr<libtorrent::alert> alert = _session.pop_alert();
+ eventHandler.handle(std::move(alert));
+ }
}
+ } catch(const boost::thread_interrupted&) {
+ LOG(spam, "The FileDownloader thread was interrupted.");
+ } catch(...) {
+ _exceptionRethrower->store(boost::current_exception());
}
}
-bool
-FileDownloader::closed() const
-{
- return _closed.load();
-}
-
-void
-FileDownloader::close()
-{
- _closed.store(true);
-}
-
void
FileDownloader::signalIfFinishedDownloading(const std::string& fileReference) {
boost::optional<fs::path> path = pathToCompletedFile(fileReference);
diff --git a/filedistribution/src/vespa/filedistribution/distributor/filedownloader.h b/filedistribution/src/vespa/filedistribution/distributor/filedownloader.h
index 38de8ac4357..9056f437664 100644
--- a/filedistribution/src/vespa/filedistribution/distributor/filedownloader.h
+++ b/filedistribution/src/vespa/filedistribution/distributor/filedownloader.h
@@ -2,7 +2,7 @@
#pragma once
#include <vector>
-#include <mutex>
+#include <boost/thread/mutex.hpp>
#include <boost/filesystem/path.hpp>
#include <boost/optional.hpp>
#include <boost/multi_index_container.hpp>
@@ -15,6 +15,7 @@
#include <vespa/filedistribution/rpc/fileprovider.h>
#include "hostname.h"
#include <vespa/filedistribution/common/buffer.h>
+#include <vespa/filedistribution/common/exceptionrethrower.h>
#include <vespa/filedistribution/common/exception.h>
#include <vespa/filedistribution/model/filedbmodel.h>
@@ -39,15 +40,14 @@ class FileDownloader
};
size_t _outstanding_SRD_requests;
- std::shared_ptr<FileDistributionTracker> _tracker;
+ boost::shared_ptr<FileDistributionTracker> _tracker;
- std::mutex _modifyTorrentsDownloadingMutex;
- typedef std::lock_guard<std::mutex> LockGuard;
+ boost::mutex _modifyTorrentsDownloadingMutex;
+ typedef boost::lock_guard<boost::mutex> LockGuard;
LogSessionDeconstructed _logSessionDeconstructed;
//session is safe to use from multiple threads.
libtorrent::session _session;
- std::atomic<bool> _closed;
const boost::filesystem::path _dbPath;
typedef std::vector<char> ResumeDataBuffer;
@@ -65,9 +65,10 @@ public:
typedef FileProvider::DownloadCompletedSignal DownloadCompletedSignal;
typedef FileProvider::DownloadFailedSignal DownloadFailedSignal;
- FileDownloader(const std::shared_ptr<FileDistributionTracker>& tracker,
+ FileDownloader(const boost::shared_ptr<FileDistributionTracker>& tracker,
const std::string& hostName, int port,
- const boost::filesystem::path& dbPath);
+ const boost::filesystem::path& dbPath,
+ const boost::shared_ptr<ExceptionRethrower>& exceptionRethrower);
~FileDownloader();
DirectoryGuard::UP getGuard() { return std::make_unique<DirectoryGuard>(_dbPath); }
@@ -82,8 +83,8 @@ public:
std::string infoHash2FileReference(const libtorrent::sha1_hash& hash);
void setMaxDownloadSpeed(double MBPerSec);
void setMaxUploadSpeed(double MBPerSec);
- void close();
- bool closed() const;
+
+ const boost::shared_ptr<ExceptionRethrower> _exceptionRethrower;
const std::string _hostName;
const int _port;
diff --git a/filedistribution/src/vespa/filedistribution/distributor/filedownloadermanager.cpp b/filedistribution/src/vespa/filedistribution/distributor/filedownloadermanager.cpp
index bf17b1bc8d1..7bc57c57dd1 100644
--- a/filedistribution/src/vespa/filedistribution/distributor/filedownloadermanager.cpp
+++ b/filedistribution/src/vespa/filedistribution/distributor/filedownloadermanager.cpp
@@ -7,12 +7,14 @@ LOG_SETUP(".filedownloadermanager");
#include <iterator>
#include <sstream>
-#include <thread>
-
-using namespace std::literals;
+#include <boost/lambda/lambda.hpp>
+#include <boost/lambda/bind.hpp>
+#include <boost/thread.hpp>
using filedistribution::FileDownloaderManager;
+namespace lambda = boost::lambda;
+
namespace {
void logStartDownload(const std::set<std::string> & filesToDownload) {
std::ostringstream msg;
@@ -24,8 +26,8 @@ void logStartDownload(const std::set<std::string> & filesToDownload) {
} //anonymous namespace
FileDownloaderManager::FileDownloaderManager(
- const std::shared_ptr<FileDownloader>& downloader,
- const std::shared_ptr<FileDistributionModel>& model)
+ const boost::shared_ptr<FileDownloader>& downloader,
+ const boost::shared_ptr<FileDistributionModel>& model)
:_fileDownloader(downloader),
_fileDistributionModel(model),
@@ -40,14 +42,20 @@ FileDownloaderManager::~FileDownloaderManager() {
void
FileDownloaderManager::start()
{
- _downloadFailedConnection = downloadFailed().connect(
- DownloadFailedSignal::slot_type([&] (const std::string & peer, FileProvider::FailedDownloadReason reason) { (void) reason; removePeerStatus(peer); }).track_foreign(shared_from_this()));
-
- _downloadCompletedConnection = downloadCompleted().connect(
- DownloadCompletedSignal::slot_type(_setFinishedDownloadingStatus).track_foreign(shared_from_this()));
-
- _filesToDownloadChangedConnection = _fileDistributionModel->_filesToDownloadChanged.connect(
- FileDistributionModel::FilesToDownloadChangedSignal::slot_type(std::ref(_startDownloads)).track_foreign(shared_from_this()));
+ _downloadFailedConnection =
+ downloadFailed().connect(
+ DownloadFailedSignal::slot_type(lambda::bind(&FileDownloaderManager::removePeerStatus, this, lambda::_1)).
+ track(shared_from_this()));
+
+ _downloadCompletedConnection =
+ downloadCompleted().connect(
+ DownloadCompletedSignal::slot_type(_setFinishedDownloadingStatus).
+ track(shared_from_this()));
+
+ _filesToDownloadChangedConnection =
+ _fileDistributionModel->_filesToDownloadChanged.connect(
+ FileDistributionModel::FilesToDownloadChangedSignal::slot_type(boost::ref(_startDownloads)).
+ track(shared_from_this()));
}
boost::optional< boost::filesystem::path >
@@ -91,6 +99,7 @@ FileDownloaderManager::StartDownloads::downloadFile(const std::string& fileRefer
void
FileDownloaderManager::StartDownloads::operator()() {
+ namespace ll = boost::lambda;
DirectoryGuard::UP guard = _parent._fileDownloader->getGuard();
LockGuard updateFilesToDownloadGuard(_parent._updateFilesToDownloadMutex);
@@ -99,7 +108,7 @@ FileDownloaderManager::StartDownloads::operator()() {
logStartDownload(filesToDownload);
std::for_each(filesToDownload.begin(), filesToDownload.end(),
- [&] (const std::string& file) { downloadFile(file); });
+ ll::bind(&StartDownloads::downloadFile, this, ll::_1));
_parent._fileDownloader->removeAllTorrentsBut(filesToDownload);
}
@@ -126,7 +135,7 @@ FileDownloaderManager::SetFinishedDownloadingStatus::operator()(
} catch(const FileDistributionModel::NotPeer&) { //Probably a concurrent removal of the torrent.
//improve chance of libtorrent session being updated.
- std::this_thread::sleep_for(100ms);
+ boost::this_thread::sleep(boost::posix_time::milliseconds(100));
if (_parent._fileDownloader->hasTorrent(fileReference)) {
_parent._fileDistributionModel->addPeer(fileReference);
diff --git a/filedistribution/src/vespa/filedistribution/distributor/filedownloadermanager.h b/filedistribution/src/vespa/filedistribution/distributor/filedownloadermanager.h
index 1294f7d7f77..f99888c5a26 100644
--- a/filedistribution/src/vespa/filedistribution/distributor/filedownloadermanager.h
+++ b/filedistribution/src/vespa/filedistribution/distributor/filedownloadermanager.h
@@ -1,7 +1,9 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
+#include <boost/thread/mutex.hpp>
#include <boost/signals2/signal.hpp>
+#include <boost/enable_shared_from_this.hpp>
#include <vespa/filedistribution/rpc/fileprovider.h>
#include <vespa/filedistribution/model/filedistributionmodel.h>
@@ -10,7 +12,7 @@
namespace filedistribution {
class FileDownloaderManager : public FileProvider,
- public std::enable_shared_from_this<FileDownloaderManager> {
+ public boost::enable_shared_from_this<FileDownloaderManager> {
class StartDownloads {
FileDownloaderManager& _parent;
@@ -27,11 +29,11 @@ class FileDownloaderManager : public FileProvider,
SetFinishedDownloadingStatus(FileDownloaderManager*);
};
- typedef std::lock_guard<std::mutex> LockGuard;
- std::mutex _updateFilesToDownloadMutex;
+ typedef boost::lock_guard<boost::mutex> LockGuard;
+ boost::mutex _updateFilesToDownloadMutex;
- std::shared_ptr<FileDownloader> _fileDownloader;
- std::shared_ptr<FileDistributionModel> _fileDistributionModel;
+ boost::shared_ptr<FileDownloader> _fileDownloader;
+ boost::shared_ptr<FileDistributionModel> _fileDistributionModel;
StartDownloads _startDownloads;
SetFinishedDownloadingStatus _setFinishedDownloadingStatus;
@@ -41,11 +43,10 @@ class FileDownloaderManager : public FileProvider,
void removePeerStatus(const std::string& fileReference);
public:
- using SP = std::shared_ptr<FileDownloaderManager>;
FileDownloaderManager(const FileDownloaderManager &) = delete;
FileDownloaderManager & operator = (const FileDownloaderManager &) = delete;
- FileDownloaderManager(const std::shared_ptr<FileDownloader>&,
- const std::shared_ptr<FileDistributionModel>& model);
+ FileDownloaderManager(const boost::shared_ptr<FileDownloader>&,
+ const boost::shared_ptr<FileDistributionModel>& model);
~FileDownloaderManager();
void start();
diff --git a/filedistribution/src/vespa/filedistribution/distributor/scheduler.cpp b/filedistribution/src/vespa/filedistribution/distributor/scheduler.cpp
index 4f75afb4850..1ae0b0b1f95 100644
--- a/filedistribution/src/vespa/filedistribution/distributor/scheduler.cpp
+++ b/filedistribution/src/vespa/filedistribution/distributor/scheduler.cpp
@@ -1,6 +1,9 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/fastos/fastos.h>
#include "scheduler.h"
+
+#include <boost/bind.hpp>
+
#include <iostream>
namespace asio = boost::asio;
@@ -16,8 +19,7 @@ void
Task::schedule(asio::deadline_timer::duration_type delay)
{
_timer.expires_from_now(delay);
- std::shared_ptr<Task> self = shared_from_this();;
- _timer.async_wait([self](const auto & e) { self->handle(e); });
+ _timer.async_wait(boost::bind(&Task::handle, shared_from_this(), _1));
}
void
@@ -34,13 +36,14 @@ Task::handle(const boost::system::error_code& code) {
}
-Scheduler::Scheduler(std::function<void (asio::io_service&)> callRun)
+Scheduler::Scheduler(boost::function<void (asio::io_service&)> callRun)
:_keepAliveWork(ioService),
- _workerThread([&, callRun]() { callRun(ioService); })
+ _workerThread(boost::bind(callRun, boost::ref(ioService)))
{}
Scheduler::~Scheduler() {
ioService.stop();
+ _workerThread.interrupt();
_workerThread.join();
ioService.reset();
}
diff --git a/filedistribution/src/vespa/filedistribution/distributor/scheduler.h b/filedistribution/src/vespa/filedistribution/distributor/scheduler.h
index c2eead235bf..9492a8977d7 100644
--- a/filedistribution/src/vespa/filedistribution/distributor/scheduler.h
+++ b/filedistribution/src/vespa/filedistribution/distributor/scheduler.h
@@ -3,17 +3,18 @@
#include <boost/asio/io_service.hpp>
#include <boost/asio/deadline_timer.hpp>
-#include <thread>
+#include <boost/enable_shared_from_this.hpp>
+#include <boost/thread.hpp>
namespace filedistribution {
class Scheduler {
public:
- class Task : public std::enable_shared_from_this<Task> {
+ class Task : public boost::enable_shared_from_this<Task> {
boost::asio::deadline_timer _timer;
public:
- typedef std::shared_ptr<Task> SP;
+ typedef boost::shared_ptr<Task> SP;
Task(Scheduler& scheduler);
@@ -33,12 +34,12 @@ private:
//keeps io_service.run() from exiting until it has been destructed,
//see http://www.boost.org/doc/libs/1_42_0/doc/html/boost_asio/reference/io_service.html
boost::asio::io_service::work _keepAliveWork;
- std::thread _workerThread;
+ boost::thread _workerThread;
public:
Scheduler(const Scheduler &) = delete;
Scheduler & operator = (const Scheduler &) = delete;
- Scheduler(std::function<void (boost::asio::io_service&)> callRun) ;
+ Scheduler(boost::function<void (boost::asio::io_service&)> callRun) ;
~Scheduler();
};
diff --git a/filedistribution/src/vespa/filedistribution/manager/createtorrent.cpp b/filedistribution/src/vespa/filedistribution/manager/createtorrent.cpp
index fd54b65cdbc..001edd0e20a 100644
--- a/filedistribution/src/vespa/filedistribution/manager/createtorrent.cpp
+++ b/filedistribution/src/vespa/filedistribution/manager/createtorrent.cpp
@@ -10,6 +10,8 @@
#include <string>
#include <boost/filesystem/convenience.hpp>
+#include <boost/lambda/lambda.hpp>
+
#include "libtorrent/torrent_info.hpp"
namespace fs = boost::filesystem;
diff --git a/filedistribution/src/vespa/filedistribution/manager/filedistributionmanager.cpp b/filedistribution/src/vespa/filedistribution/manager/filedistributionmanager.cpp
index 057902327a9..fb75d88e031 100644
--- a/filedistribution/src/vespa/filedistribution/manager/filedistributionmanager.cpp
+++ b/filedistribution/src/vespa/filedistribution/manager/filedistributionmanager.cpp
@@ -3,6 +3,8 @@
#include <vespa/filedistribution/manager/com_yahoo_vespa_filedistribution_FileDistributionManager.h>
#include <memory>
+#include <boost/lambda/lambda.hpp>
+
#include <vespa/filedistribution/model/filedistributionmodel.h>
#include <vespa/filedistribution/model/zkfiledbmodel.h>
#include <vespa/filedistribution/model/mockfiledistributionmodel.h>
@@ -87,7 +89,8 @@ void initMockFileDBModel(NativeFileDistributionManager& manager)
void initFileDBModel(NativeFileDistributionManager& manager, const std::string& zkServers)
{
//Ignored for now, since we're not installing any watchers.
- std::shared_ptr<ZKFacade> zk(new ZKFacade(zkServers));
+ boost::shared_ptr<ExceptionRethrower> ignoredRethrower(new ExceptionRethrower());
+ boost::shared_ptr<ZKFacade> zk(new ZKFacade(zkServers, ignoredRethrower));
manager._fileDBModel.reset(new ZKFileDBModel(zk));
}
} //end anonymous namespace
diff --git a/filedistribution/src/vespa/filedistribution/model/deployedfilestodownload.cpp b/filedistribution/src/vespa/filedistribution/model/deployedfilestodownload.cpp
index 1e80ff375a4..733d60d91bb 100644
--- a/filedistribution/src/vespa/filedistribution/model/deployedfilestodownload.cpp
+++ b/filedistribution/src/vespa/filedistribution/model/deployedfilestodownload.cpp
@@ -5,6 +5,9 @@
#include <sstream>
#include <iterator>
+#include <boost/lambda/lambda.hpp>
+#include <boost/lambda/bind.hpp>
+
#include <vespa/filedistribution/common/logfwd.h>
using filedistribution::DeployedFilesToDownload;
@@ -34,7 +37,8 @@ DeployedFilesToDownload::addNewDeployNode(Path parentPath, const FileReferences&
std::ostringstream filesStream;
if (!files.empty()) {
filesStream << files[0];
- std::for_each(files.begin() +1, files.end(), [&](const auto & v) { filesStream << '\n' << v; });
+ std::for_each(files.begin() +1, files.end(),
+ filesStream <<boost::lambda::constant('\n') <<boost::lambda::_1);
}
Path retPath = _zk.createSequenceNode(path, filesStream.str().c_str(), filesStream.str().length());
return retPath;
@@ -70,7 +74,8 @@ DeployedFilesToDownload::deleteExpiredDeployNodes(Path parentPath, StringVector
size_t numberOfNodesToDelete = children.size() - numberOfDeploysToKeepFiles;
std::for_each(children.begin(), children.begin() + numberOfNodesToDelete,
- [&](const std::string & s) {_zk.remove(parentPath / s); });
+ boost::lambda::bind(&ZKFacade::remove, &_zk,
+ boost::lambda::ret<Path>(parentPath / boost::lambda::_1)));
}
}
diff --git a/filedistribution/src/vespa/filedistribution/model/filedistributionmodelimpl.cpp b/filedistribution/src/vespa/filedistribution/model/filedistributionmodelimpl.cpp
index adff69cfc6c..5b9de93249a 100644
--- a/filedistribution/src/vespa/filedistribution/model/filedistributionmodelimpl.cpp
+++ b/filedistribution/src/vespa/filedistribution/model/filedistributionmodelimpl.cpp
@@ -8,6 +8,10 @@
#include <cstdlib>
#include <boost/filesystem.hpp>
+#include <boost/lambda/lambda.hpp>
+#include <boost/lambda/bind.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/thread/locks.hpp>
#include <zookeeper/zookeeper.h>
#include <vespa/log/log.h>
@@ -70,18 +74,18 @@ using filedistribution::FileDistributionModelImpl;
struct FileDistributionModelImpl::DeployedFilesChangedCallback :
public ZKFacade::NodeChangedWatcher
{
- typedef std::shared_ptr<DeployedFilesChangedCallback> SP;
+ typedef boost::shared_ptr<DeployedFilesChangedCallback> SP;
- std::weak_ptr<FileDistributionModelImpl> _parent;
+ boost::weak_ptr<FileDistributionModelImpl> _parent;
DeployedFilesChangedCallback(
- const std::shared_ptr<FileDistributionModelImpl> & parent)
+ const boost::shared_ptr<FileDistributionModelImpl> & parent)
:_parent(parent)
{}
//override
void operator()() {
- if (std::shared_ptr<FileDistributionModelImpl> model = _parent.lock()) {
+ if (boost::shared_ptr<FileDistributionModelImpl> model = _parent.lock()) {
model->_filesToDownloadChanged();
}
}
@@ -107,7 +111,9 @@ FileDistributionModelImpl::getPeers(const std::string& fileReference, size_t max
PeerEntries result;
result.reserve(end - peers.begin());
- std::for_each(peers.begin(), end, [&] (const std::string & s) { addPeerEntry(s, result); });
+ namespace ll=boost::lambda;
+ std::for_each(peers.begin(), end,
+ ll::bind(&addPeerEntry, boost::lambda::_1, boost::ref(result)));
LOG(debug, "Found %zu peers for path '%s'", result.size(), path.string().c_str());
return result;
@@ -223,8 +229,11 @@ FileDistributionModelImpl::addConfigServersAsPeers(
void
FileDistributionModelImpl::configure(std::unique_ptr<FilereferencesConfig> config) {
- const bool changed = updateActiveFileReferences(config->filereferences);
- if (changed) {
- _filesToDownloadChanged();
+ try {
+ const bool changed = updateActiveFileReferences(config->filereferences);
+ if (changed)
+ _filesToDownloadChanged();
+ } catch(...) {
+ _exceptionRethrower->store(boost::current_exception());
}
}
diff --git a/filedistribution/src/vespa/filedistribution/model/filedistributionmodelimpl.h b/filedistribution/src/vespa/filedistribution/model/filedistributionmodelimpl.h
index 224009822e1..04a111a00df 100644
--- a/filedistribution/src/vespa/filedistribution/model/filedistributionmodelimpl.h
+++ b/filedistribution/src/vespa/filedistribution/model/filedistributionmodelimpl.h
@@ -1,10 +1,13 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
+#include <boost/enable_shared_from_this.hpp>
+
#include "filedistributionmodel.h"
#include <vespa/filedistribution/model/config-filereferences.h>
#include "zkfacade.h"
#include "zkfiledbmodel.h"
+#include <vespa/filedistribution/common/exceptionrethrower.h>
#include <vespa/config/config.h>
using cloud::config::filedistribution::FilereferencesConfig;
@@ -13,30 +16,35 @@ namespace filedistribution {
class FileDistributionModelImpl : public FileDistributionModel,
public config::IFetcherCallback<FilereferencesConfig>,
- public std::enable_shared_from_this<FileDistributionModelImpl>
+ public boost::enable_shared_from_this<FileDistributionModelImpl>
{
struct DeployedFilesChangedCallback;
const std::string _hostName;
const int _port;
- const std::shared_ptr<ZKFacade> _zk;
+ const boost::shared_ptr<ZKFacade> _zk;
ZKFileDBModel _fileDBModel;
- std::mutex _activeFileReferencesMutex;
- typedef std::lock_guard<std::mutex> LockGuard;
+ boost::mutex _activeFileReferencesMutex;
+ typedef boost::lock_guard<boost::mutex> LockGuard;
std::vector<vespalib::string> _activeFileReferences;
+ const boost::shared_ptr<ExceptionRethrower> _exceptionRethrower;
+
bool /*changed*/
updateActiveFileReferences(const std::vector<vespalib::string>& fileReferences);
ZKFacade::Path getPeerEntryPath(const std::string& fileReference);
public:
- FileDistributionModelImpl(const std::string& hostName, int port, const std::shared_ptr<ZKFacade>& zk)
+ FileDistributionModelImpl(const std::string& hostName, int port,
+ const boost::shared_ptr<ZKFacade>& zk,
+ const boost::shared_ptr<ExceptionRethrower>& exceptionRethrower)
:_hostName(hostName),
_port(port),
_zk(zk),
- _fileDBModel(_zk)
+ _fileDBModel(_zk),
+ _exceptionRethrower(exceptionRethrower)
{
/* Hack: Force the first call to updateActiveFileReferences to return changed=true
when the file references config is empty.
diff --git a/filedistribution/src/vespa/filedistribution/model/zkfacade.cpp b/filedistribution/src/vespa/filedistribution/model/zkfacade.cpp
index 7e7caf67ff6..ecfb3ca7b44 100644
--- a/filedistribution/src/vespa/filedistribution/model/zkfacade.cpp
+++ b/filedistribution/src/vespa/filedistribution/model/zkfacade.cpp
@@ -9,16 +9,17 @@
#include <cassert>
#include <cstdio>
#include <sstream>
-#include <thread>
+#include <boost/lambda/lambda.hpp>
+#include <boost/lambda/bind.hpp>
#include <boost/throw_exception.hpp>
#include <boost/function_output_iterator.hpp>
+#include <boost/thread.hpp>
#include <zookeeper/zookeeper.h>
#include <vespa/filedistribution/common/logfwd.h>
#include <vespa/defaults.h>
-#include <vespa/vespalib/util/sync.h>
-typedef std::unique_lock<std::mutex> UniqueLock;
+typedef boost::unique_lock<boost::mutex> UniqueLock;
using filedistribution::ZKFacade;
using filedistribution::Move;
@@ -135,11 +136,11 @@ setDataForExistingFile(ZKFacade& zk, const Path& path, const char* buffer, int l
/********** Active watchers *******************************************/
struct ZKFacade::ZKWatcher {
- const std::weak_ptr<ZKFacade> _owner;
+ const boost::weak_ptr<ZKFacade> _owner;
const NodeChangedWatcherSP _nodeChangedWatcher;
ZKWatcher(
- const std::shared_ptr<ZKFacade> &owner,
+ const boost::shared_ptr<ZKFacade> &owner,
const NodeChangedWatcherSP& nodeChangedWatcher )
:_owner(owner),
_nodeChangedWatcher(nodeChangedWatcher)
@@ -169,7 +170,7 @@ struct ZKFacade::ZKWatcher {
//this will cause infinite waiting.
//To avoid this, a custom shared_ptr deleter using a separate deleter thread must be used.
- if (std::shared_ptr<ZKFacade> zk = self->_owner.lock()) {
+ if (boost::shared_ptr<ZKFacade> zk = self->_owner.lock()) {
zk->invokeWatcher(watcherContext);
}
@@ -180,15 +181,15 @@ struct ZKFacade::ZKWatcher {
void
ZKFacade::stateWatchingFun(zhandle_t*, int type, int state, const char* path, void* context) {
(void)path;
- (void)context;
//The ZKFacade won't expire before zookeeper_close has finished.
+ ZKFacade* self = (ZKFacade*)context;
if (type == ZOO_SESSION_EVENT) {
LOGFWD(debug, "Zookeeper session event: %d", state);
if (state == ZOO_EXPIRED_SESSION_STATE) {
- throw ZKSessionExpired();
+ self->_exceptionRethrower->store(ZKSessionExpired());
} else if (state == ZOO_AUTH_FAILED_STATE) {
- throw ZKGenericException(ZNOAUTH);
+ self->_exceptionRethrower->store(ZKGenericException(ZNOAUTH));
}
} else {
LOGFWD(info, "State watching function: Unexpected event: '%d' -- '%d' ", type, state);
@@ -200,21 +201,21 @@ void* /* watcherContext */
ZKFacade::registerWatcher(const NodeChangedWatcherSP& watcher) {
UniqueLock lock(_watchersMutex);
- std::shared_ptr<ZKWatcher> zkWatcher(new ZKWatcher(shared_from_this(), watcher));
+ boost::shared_ptr<ZKWatcher> zkWatcher(new ZKWatcher(shared_from_this(), watcher));
_watchers[zkWatcher.get()] = zkWatcher;
return zkWatcher.get();
}
-std::shared_ptr<ZKFacade::ZKWatcher>
+boost::shared_ptr<ZKFacade::ZKWatcher>
ZKFacade::unregisterWatcher(void* watcherContext) {
UniqueLock lock(_watchersMutex);
WatchersMap::iterator i = _watchers.find(watcherContext);
if (i == _watchers.end()) {
- return std::shared_ptr<ZKWatcher>();
+ return boost::shared_ptr<ZKWatcher>();
} else {
- std::shared_ptr<ZKWatcher> result = i->second;
+ boost::shared_ptr<ZKWatcher> result = i->second;
_watchers.erase(i);
return result;
}
@@ -222,24 +223,30 @@ ZKFacade::unregisterWatcher(void* watcherContext) {
void
ZKFacade::invokeWatcher(void* watcherContext) {
- std::shared_ptr<ZKWatcher> watcher = unregisterWatcher(watcherContext);
+ try {
+ boost::shared_ptr<ZKWatcher> watcher = unregisterWatcher(watcherContext);
- if (!_watchersEnabled)
- return;
+ if (!_watchersEnabled)
+ return;
- if (watcher) {
- (*watcher->_nodeChangedWatcher)();
- } else {
- LOGFWD(error, "Invoke called on expired watcher.");
+ if (watcher) {
+ (*watcher->_nodeChangedWatcher)();
+ } else {
+ LOGFWD(error, "Invoke called on expired watcher.");
+ }
+ } catch(...) {
+ _exceptionRethrower->store(boost::current_exception());
}
}
/********** End live watchers ***************************************/
-ZKFacade::ZKFacade(const std::string& zkservers)
+ZKFacade::ZKFacade(const std::string& zkservers,
+ const boost::shared_ptr<ExceptionRethrower> &exceptionRethrower)
:_retriesEnabled(true),
_watchersEnabled(true),
+ _exceptionRethrower(exceptionRethrower),
_zhandle(zookeeper_init(zkservers.c_str(),
&ZKFacade::stateWatchingFun,
_zkSessionTimeOut,
@@ -255,15 +262,14 @@ ZKFacade::ZKFacade(const std::string& zkservers)
ZKFacade::~ZKFacade() {
disableRetries();
_watchersEnabled = false;
- vespalib::Gate done;
- std::thread closer([&done, zhandle=_zhandle] () { zookeeper_close(zhandle); done.countDown(); });
- if ( done.await(50*1000) ) {
+
+ boost::thread shutdownCaller(zookeeper_close, _zhandle);
+ if (shutdownCaller.timed_join(boost::posix_time::seconds(120))) {
LOGFWD(debug, "Zookeeper connection closed successfully.");
} else {
- LOGFWD(error, "Not able to close down zookeeper. Dumping core so you can figure out what is wrong");
+ LOGFWD(info, "Timed out waiting for the zookeeper connection to shut down.");
abort();
}
- closer.join();
}
const std::string
@@ -439,9 +445,13 @@ ZKFacade::addEphemeralNode(const Path& path) {
void
ZKFacade::remove(const Path& path) {
+ namespace ll = boost::lambda;
+
std::vector< std::string > children = getChildren(path);
if (!children.empty()) {
- std::for_each(children.begin(), children.end(), [&](const std::string & s){ remove(path / s); });
+ std::for_each(children.begin(), children.end(),
+ ll::bind(&ZKFacade::remove, this,
+ ll::ret<Path>(path / ll::_1)));
}
try {
@@ -483,9 +493,12 @@ ZKFacade::retainOnly(const Path& path, const std::vector<std::string>& childrenT
Children toPreserveSorted(childrenToPreserve);
std::sort(toPreserveSorted.begin(), toPreserveSorted.end());
+ namespace ll = boost::lambda;
std::set_difference(current.begin(), current.end(),
toPreserveSorted.begin(), toPreserveSorted.end(),
- boost::make_function_output_iterator([&](const std::string & s){ remove(path / s); }));
+ boost::make_function_output_iterator(
+ ll::bind(&ZKFacade::remove, this,
+ ll::ret<Path>(path / ll::_1))));
}
std::vector< std::string >
diff --git a/filedistribution/src/vespa/filedistribution/model/zkfacade.h b/filedistribution/src/vespa/filedistribution/model/zkfacade.h
index 7631fa6d9dc..e46ed42fdec 100644
--- a/filedistribution/src/vespa/filedistribution/model/zkfacade.h
+++ b/filedistribution/src/vespa/filedistribution/model/zkfacade.h
@@ -3,12 +3,13 @@
#include <string>
#include <vector>
-#include <mutex>
#include <boost/filesystem/path.hpp>
#include <boost/signals2.hpp>
+#include <boost/enable_shared_from_this.hpp>
#include <vespa/filedistribution/common/buffer.h>
#include <vespa/filedistribution/common/exception.h>
+#include <vespa/filedistribution/common/exceptionrethrower.h>
struct _zhandle;
typedef _zhandle zhandle_t;
@@ -58,10 +59,11 @@ diagnosticUserLevelMessage(const ZKException& zk);
-class ZKFacade : public std::enable_shared_from_this<ZKFacade> {
+class ZKFacade : public boost::enable_shared_from_this<ZKFacade> {
volatile bool _retriesEnabled;
volatile bool _watchersEnabled;
+ boost::shared_ptr<ExceptionRethrower> _exceptionRethrower;
zhandle_t* _zhandle;
const static int _zkSessionTimeOut = 30 * 1000;
const static size_t _maxDataSize = 1024 * 1024;
@@ -69,7 +71,7 @@ class ZKFacade : public std::enable_shared_from_this<ZKFacade> {
class ZKWatcher;
static void stateWatchingFun(zhandle_t*, int type, int state, const char* path, void* context);
public:
- typedef std::shared_ptr<ZKFacade> SP;
+ typedef boost::shared_ptr<ZKFacade> SP;
/* Lifetime is managed by ZKFacade.
Derived classes should only contain weak_ptrs to other objects
@@ -84,12 +86,12 @@ public:
virtual void operator()() = 0;
};
- typedef std::shared_ptr<NodeChangedWatcher> NodeChangedWatcherSP;
+ typedef boost::shared_ptr<NodeChangedWatcher> NodeChangedWatcherSP;
typedef boost::filesystem::path Path;
ZKFacade(const ZKFacade &) = delete;
ZKFacade & operator = (const ZKFacade &) = delete;
- ZKFacade(const std::string& zkservers);
+ ZKFacade(const std::string& zkservers, const boost::shared_ptr<ExceptionRethrower> &);
~ZKFacade();
bool hasNode(const Path&);
@@ -123,11 +125,11 @@ public:
private:
void* registerWatcher(const NodeChangedWatcherSP &); //returns watcherContext
- std::shared_ptr<ZKWatcher> unregisterWatcher(void* watcherContext);
+ boost::shared_ptr<ZKWatcher> unregisterWatcher(void* watcherContext);
void invokeWatcher(void* watcherContext);
- std::mutex _watchersMutex;
- typedef std::map<void*, std::shared_ptr<ZKWatcher> > WatchersMap;
+ boost::mutex _watchersMutex;
+ typedef std::map<void*, boost::shared_ptr<ZKWatcher> > WatchersMap;
WatchersMap _watchers;
};
diff --git a/filedistribution/src/vespa/filedistribution/model/zkfiledbmodel.cpp b/filedistribution/src/vespa/filedistribution/model/zkfiledbmodel.cpp
index 70827305138..f9a4c777b30 100644
--- a/filedistribution/src/vespa/filedistribution/model/zkfiledbmodel.cpp
+++ b/filedistribution/src/vespa/filedistribution/model/zkfiledbmodel.cpp
@@ -4,6 +4,9 @@
#include <ostream>
#include <algorithm>
+#include <boost/lambda/lambda.hpp>
+#include <boost/lambda/bind.hpp>
+#include <boost/foreach.hpp>
#include "zkfacade.h"
#include "zkfiledbmodel.h"
@@ -204,7 +207,7 @@ ZKFileDBModel::getHostStatus(const std::string& hostName) {
hostStatus._numFilesToDownload = filesToDownload.size();
hostStatus._numFilesFinished = 0;
- for (const std::string & file : filesToDownload) {
+ BOOST_FOREACH(std::string file, filesToDownload) {
Path path = getPeersPath(file);
const PeerEntries peerEntries = getSortedChildren(*_zk, path);
@@ -239,7 +242,7 @@ ZKFileDBModel::cleanFiles(
_zk->retainOnly(_fileDBPath, filesToPreserve);
}
-ZKFileDBModel::ZKFileDBModel(const std::shared_ptr<ZKFacade>& zk)
+ZKFileDBModel::ZKFileDBModel(const boost::shared_ptr<ZKFacade>& zk)
: _zk(zk)
{
createNode(_root, *_zk);
@@ -257,7 +260,8 @@ ZKFileDBModel::getProgress(const Path& path) {
else if (buffer.size() == 0)
return 0;
else {
- throw boost::enable_current_exception(InvalidProgressException()) <<errorinfo::Path(path);
+ throw boost::enable_current_exception(InvalidProgressException())
+ <<errorinfo::Path(path);
}
} catch (ZKNodeDoesNotExistsException& e) {
//progress information deleted
@@ -277,7 +281,7 @@ ZKFileDBModel::getProgress(const std::string& fileReference,
const PeerEntries peerEntries = getSortedChildren(*_zk, path);
PeerEntries::const_iterator current = peerEntries.begin();
- for (const std::string& host : hostsSortedAscending) {
+ BOOST_FOREACH(const std::string& host, hostsSortedAscending) {
PeerEntries::const_iterator candidate =
std::lower_bound(current, peerEntries.end(), host);
diff --git a/filedistribution/src/vespa/filedistribution/model/zkfiledbmodel.h b/filedistribution/src/vespa/filedistribution/model/zkfiledbmodel.h
index 4249410c00e..cf180f4c780 100644
--- a/filedistribution/src/vespa/filedistribution/model/zkfiledbmodel.h
+++ b/filedistribution/src/vespa/filedistribution/model/zkfiledbmodel.h
@@ -1,6 +1,8 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
+#include <boost/shared_ptr.hpp>
+
#include "filedistributionmodel.h"
#include "zkfacade.h"
@@ -10,7 +12,7 @@ class ZKFileDBModel : public FileDBModel {
public:
typedef boost::filesystem::path Path;
private:
- const std::shared_ptr<ZKFacade> _zk;
+ const boost::shared_ptr<ZKFacade> _zk;
char getProgress(const Path& path);
void removeDeployFileNodes(const Path& hostPath, const std::string& appId);
void removeLegacyDeployFileNodes(const Path& hostPath);
@@ -47,7 +49,7 @@ public:
std::vector<std::string> getHosts();
HostStatus getHostStatus(const std::string& hostName);
- ZKFileDBModel(const std::shared_ptr<ZKFacade>& zk);
+ ZKFileDBModel(const boost::shared_ptr<ZKFacade>& zk);
Progress getProgress(const std::string& fileReference,
const std::vector<std::string>& hostsSortedAscending);
diff --git a/filedistribution/src/vespa/filedistribution/rpc/filedistributorrpc.cpp b/filedistribution/src/vespa/filedistribution/rpc/filedistributorrpc.cpp
index 6579ea06f31..4e63b90d8b9 100644
--- a/filedistribution/src/vespa/filedistribution/rpc/filedistributorrpc.cpp
+++ b/filedistribution/src/vespa/filedistribution/rpc/filedistributorrpc.cpp
@@ -3,6 +3,10 @@
#include "filedistributorrpc.h"
#include <boost/optional.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/thread/locks.hpp>
+#include <boost/lambda/bind.hpp>
+#include <boost/foreach.hpp>
#include <boost/exception/diagnostic_information.hpp>
#include <vespa/log/log.h>
@@ -16,12 +20,10 @@ LOG_SETUP(".filedistributorrpc");
#include <vespa/filedistribution/model/filedbmodel.h>
using filedistribution::FileDistributorRPC;
-using filedistribution::FileProvider;
-
-namespace fs = boost::filesystem;
+namespace ll = boost::lambda;
namespace {
-typedef std::lock_guard<std::mutex> LockGuard;
+typedef boost::lock_guard<boost::mutex> LockGuard;
struct RPCErrorCodes {
const static uint32_t baseErrorCode = 0x10000;
@@ -33,7 +35,7 @@ struct RPCErrorCodes {
class QueuedRequests {
bool _shuttingDown;
- std::mutex _mutex;
+ boost::mutex _mutex;
typedef std::multimap<std::string, FRT_RPCRequest*> Map;
Map _queuedRequests;
@@ -44,8 +46,7 @@ class QueuedRequests {
typedef Map::iterator iterator;
std::pair<iterator, iterator> range = _queuedRequests.equal_range(fileReference);
- for (iterator it(range.first); it != range.second; it++) {
- const Map::value_type & request(*it);
+ BOOST_FOREACH( const Map::value_type& request, range) {
LOG(info, "Returning earlier enqueued request for file reference '%s'.", request.first.c_str());
func(*request.second);
request.second->Return();
@@ -69,14 +70,14 @@ class QueuedRequests {
};
struct DownloadFailed {
- FileProvider::FailedDownloadReason _reason;
+ filedistribution::FileProvider::FailedDownloadReason _reason;
void operator()(FRT_RPCRequest& request) {
LOG(info, "Download failed: '%d'", _reason);
request.SetError(RPCErrorCodes::baseFileProviderErrorCode + _reason, "Download failed");
}
- DownloadFailed(FileProvider::FailedDownloadReason reason)
+ DownloadFailed(filedistribution::FileProvider::FailedDownloadReason reason)
:_reason(reason)
{}
};
@@ -115,13 +116,15 @@ public:
_queuedRequests.erase(candidate);
}
- void downloadFinished(const std::string& fileReference, const fs::path& path) {
+ void downloadFinished(const std::string& fileReference,
+ const boost::filesystem::path& path) {
DownloadFinished handler(path.string());
returnAnswer(fileReference, handler);
}
- void downloadFailed(const std::string& fileReference, FileProvider::FailedDownloadReason reason) {
+ void downloadFailed(const std::string& fileReference,
+ filedistribution::FileProvider::FailedDownloadReason reason) {
DownloadFailed handler(reason);
returnAnswer(fileReference, handler);
@@ -131,7 +134,7 @@ public:
LockGuard guard(_mutex);
_shuttingDown = true;
- for (const Map::value_type& request : _queuedRequests) {
+ BOOST_FOREACH( const Map::value_type& request, _queuedRequests) {
LOG(info, "Shutdown: Aborting earlier enqueued request for file reference '%s'.", request.first.c_str());
abort(request.second);
}
@@ -143,7 +146,7 @@ public:
class FileDistributorRPC::Server : public FRT_Invokable {
public:
- FileProvider::SP _fileProvider;
+ boost::shared_ptr<FileProvider> _fileProvider;
std::unique_ptr<FRT_Supervisor> _supervisor;
QueuedRequests _queuedRequests;
@@ -156,15 +159,16 @@ class FileDistributorRPC::Server : public FRT_Invokable {
Server(const Server &) = delete;
Server & operator = (const Server &) = delete;
- Server(int listen_port, const FileProvider::SP & provider);
- void start(const FileDistributorRPC::SP & parent);
+ Server(int listen_port, const boost::shared_ptr<FileProvider>& provider);
+ void start(const boost::shared_ptr<FileDistributorRPC> parent);
~Server();
void waitFor(FRT_RPCRequest*);
};
FileDistributorRPC::
-Server::Server(int listen_port, const FileProvider::SP & provider)
+Server::Server(int listen_port,
+ const boost::shared_ptr<filedistribution::FileProvider>& provider)
:_fileProvider(provider),
_supervisor(new FRT_Supervisor())
{
@@ -174,7 +178,8 @@ Server::Server(int listen_port, const FileProvider::SP & provider)
}
-FileDistributorRPC::Server::~Server() {
+FileDistributorRPC::
+Server::~Server() {
_queuedRequests.shutdown();
const bool waitForFinished = true;
@@ -182,16 +187,16 @@ FileDistributorRPC::Server::~Server() {
}
void
-FileDistributorRPC::Server::start(const FileDistributorRPC::SP & parent) {
+FileDistributorRPC::Server::start(const boost::shared_ptr<FileDistributorRPC> parent) {
_downloadCompletedConnection =
_fileProvider->downloadCompleted().connect(FileProvider::DownloadCompletedSignal::slot_type(
- [&] (const std::string &file, const fs::path& path) { _queuedRequests.downloadFinished(file, path); })
- .track_foreign(parent));
+ ll::bind(&QueuedRequests::downloadFinished, &_queuedRequests, ll::_1, ll::_2)).
+ track(parent));
_downloadFailedConnection =
_fileProvider->downloadFailed().connect(FileProvider::DownloadFailedSignal::slot_type(
- [&] (const std::string& file, FileProvider::FailedDownloadReason reason) { _queuedRequests.downloadFailed(file, reason); })
- .track_foreign(parent));
+ ll::bind(&QueuedRequests::downloadFailed, &_queuedRequests, ll::_1, ll::_2)).
+ track(parent));
}
@@ -209,7 +214,8 @@ Server::queueRequest(const std::string& fileReference, FRT_RPCRequest* request)
}
void
-FileDistributorRPC::Server::defineMethods() {
+FileDistributorRPC::
+Server::defineMethods() {
const bool instant = true;
FRT_ReflectionBuilder builder(_supervisor.get());
builder.DefineMethod("waitFor", "s", "s", instant,
@@ -217,12 +223,13 @@ FileDistributorRPC::Server::defineMethods() {
}
void
-FileDistributorRPC::Server::waitFor(FRT_RPCRequest* request) {
+FileDistributorRPC::
+Server::waitFor(FRT_RPCRequest* request) {
try {
frtstream::FrtServerStream requestHandler(request);
std::string fileReference;
requestHandler >> fileReference;
- boost::optional<fs::path> path
+ boost::optional<boost::filesystem::path> path
= _fileProvider->getPath(fileReference);
if (path) {
LOG(debug, "Returning request for file reference '%s'.", fileReference.c_str());
@@ -246,7 +253,7 @@ FileDistributorRPC::Server::waitFor(FRT_RPCRequest* request) {
}
FileDistributorRPC::FileDistributorRPC(const std::string& connectionSpec,
- const FileProvider::SP & provider)
+ const boost::shared_ptr<filedistribution::FileProvider>& provider)
:_server(new Server(get_port(connectionSpec), provider))
{}
diff --git a/filedistribution/src/vespa/filedistribution/rpc/filedistributorrpc.h b/filedistribution/src/vespa/filedistribution/rpc/filedistributorrpc.h
index 3c780bf5878..8c492ad4b5d 100644
--- a/filedistribution/src/vespa/filedistribution/rpc/filedistributorrpc.h
+++ b/filedistribution/src/vespa/filedistribution/rpc/filedistributorrpc.h
@@ -8,14 +8,13 @@
namespace filedistribution {
-class FileDistributorRPC : public std::enable_shared_from_this<FileDistributorRPC>
+class FileDistributorRPC : public boost::enable_shared_from_this<FileDistributorRPC>
{
class Server;
public:
- using SP = std::shared_ptr<FileDistributorRPC>;
FileDistributorRPC(const FileDistributorRPC &) = delete;
FileDistributorRPC & operator = (const FileDistributorRPC &) = delete;
- FileDistributorRPC(const std::string& connectSpec, const FileProvider::SP & provider);
+ FileDistributorRPC(const std::string& connectSpec, const boost::shared_ptr<FileProvider>& provider);
void start();
diff --git a/filedistribution/src/vespa/filedistribution/rpc/fileprovider.h b/filedistribution/src/vespa/filedistribution/rpc/fileprovider.h
index 4eeeee5e359..a95b50fc0f2 100644
--- a/filedistribution/src/vespa/filedistribution/rpc/fileprovider.h
+++ b/filedistribution/src/vespa/filedistribution/rpc/fileprovider.h
@@ -10,7 +10,6 @@ namespace filedistribution {
class FileProvider
{
public:
- using SP = std::shared_ptr<FileProvider>;
typedef boost::signals2::signal<void (const std::string& /* fileReference */,
const boost::filesystem::path&)>
DownloadCompletedSignal;