diff options
author | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-06-15 23:09:44 +0200 |
---|---|---|
committer | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-06-15 23:09:44 +0200 |
commit | 72231250ed81e10d66bfe70701e64fa5fe50f712 (patch) | |
tree | 2728bba1131a6f6e5bdf95afec7d7ff9358dac50 /filedistribution |
Publish
Diffstat (limited to 'filedistribution')
97 files changed, 6731 insertions, 0 deletions
diff --git a/filedistribution/.gitignore b/filedistribution/.gitignore new file mode 100644 index 00000000000..542ce6cebef --- /dev/null +++ b/filedistribution/.gitignore @@ -0,0 +1,7 @@ +Makefile.ini +config_command.sh +project.dsw +/pom.xml.build +/target +Makefile +Testing diff --git a/filedistribution/CMakeLists.txt b/filedistribution/CMakeLists.txt new file mode 100644 index 00000000000..6205f44515b --- /dev/null +++ b/filedistribution/CMakeLists.txt @@ -0,0 +1,39 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_define_module( + DEPENDS + fastos + vespalog + fnet + frtstream + configdefinitions + config_cloudconfig + staging_vespalib + vespadefaults + vespalib + + EXTERNAL_DEPENDS + torrent-rasterbar + + LIBS + src/vespa/filedistribution/rpc + src/vespa/filedistribution/common + src/vespa/filedistribution/manager + src/vespa/filedistribution/rpcconfig + src/vespa/filedistribution/distributor + src/vespa/filedistribution/model + + TESTS + src/tests/zkfacade + src/tests/zkfiledbmodel + src/tests/filedbmodelimpl + src/tests/status + src/tests/scheduler + src/tests/rpc + src/tests/common + src/tests/filedownloader + src/tests/lib + + APPS + src/apps/status + src/apps/filedistributor +) diff --git a/filedistribution/OWNERS b/filedistribution/OWNERS new file mode 100644 index 00000000000..31af040f698 --- /dev/null +++ b/filedistribution/OWNERS @@ -0,0 +1 @@ +bratseth diff --git a/filedistribution/pom.xml b/filedistribution/pom.xml new file mode 100644 index 00000000000..ea5fb5b450e --- /dev/null +++ b/filedistribution/pom.xml @@ -0,0 +1,48 @@ +<!-- Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. --> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 + http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>com.yahoo.vespa</groupId> + <artifactId>parent</artifactId> + <version>6-SNAPSHOT</version> + <relativePath>../parent/pom.xml</relativePath> + </parent> + <artifactId>filedistribution</artifactId> + <version>6-SNAPSHOT</version> + <packaging>jar</packaging> + <name>${project.artifactId}</name> + <dependencies> + <dependency> + <groupId>com.yahoo.vespa</groupId> + <artifactId>config-lib</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + <build> + <plugins> + <plugin> + <groupId>com.yahoo.vespa</groupId> + <artifactId>config-class-plugin</artifactId> + <version>${project.version}</version> + <configuration> + <defFilesDirectories> + src/vespa/filedistribution/distributor, + src/vespa/filedistribution/model, + src/vespa/filedistribution/rpcconfig + </defFilesDirectories> + </configuration> + <executions> + <execution> + <id>config-gen</id> + <goals> + <goal>config-gen</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> diff --git a/filedistribution/src/.gitignore b/filedistribution/src/.gitignore new file mode 100644 index 00000000000..420f4e53d8a --- /dev/null +++ b/filedistribution/src/.gitignore @@ -0,0 +1,3 @@ +.depend +/Makefile.ini +/config_command.sh diff --git a/filedistribution/src/apps/.gitignore b/filedistribution/src/apps/.gitignore new file mode 100644 index 00000000000..f3c7a7c5da6 --- /dev/null +++ b/filedistribution/src/apps/.gitignore @@ -0,0 +1 @@ +Makefile diff --git a/filedistribution/src/apps/filedistributor/.gitignore b/filedistribution/src/apps/filedistributor/.gitignore new file mode 100644 index 00000000000..a41b1963b70 --- /dev/null +++ b/filedistribution/src/apps/filedistributor/.gitignore @@ -0,0 +1,2 @@ +/filedistributor +filedistributor-bin diff --git a/filedistribution/src/apps/filedistributor/CMakeLists.txt b/filedistribution/src/apps/filedistributor/CMakeLists.txt new file mode 100644 index 00000000000..f7be9d46c76 --- /dev/null +++ b/filedistribution/src/apps/filedistributor/CMakeLists.txt @@ -0,0 +1,19 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(filedistribution_filedistributor_app + SOURCES + filedistributor.cpp + OUTPUT_NAME filedistributor-bin + INSTALL sbin + DEPENDS + filedistribution_distributor + filedistribution_filedistributionmodel + filedistribution_filedistributorrpcconfig + filedistribution_filedistributorrpc + filedistribution_common +) +target_compile_options(filedistribution_filedistributor_app PRIVATE -DTORRENT_DEBUG -DTORRENT_DISABLE_ENCRYPTION -DTORRENT_DISABLE_DHT -DWITH_SHIPPED_GEOIP_H -DBOOST_ASIO_HASH_MAP_BUCKETS=1021 -DBOOST_EXCEPTION_DISABLE -DBOOST_ASIO_ENABLE_CANCELIO -DBOOST_ASIO_DYN_LINK -DTORRENT_LINKING_SHARED) +vespa_add_target_system_dependency(filedistribution_filedistributor_app boost boost_system-mt-d) +vespa_add_target_system_dependency(filedistribution_filedistributor_app boost boost_thread-mt-d) +vespa_add_target_system_dependency(filedistribution_filedistributor_app boost boost_program_options-mt-d) +vespa_add_target_system_dependency(filedistribution_filedistributor_app boost boost_filesystem-mt-d) +vespa_add_target_system_dependency(filedistribution_filedistributor_app boost boost_unit_test_framework-mt-d) diff --git a/filedistribution/src/apps/filedistributor/filedistributor.cpp b/filedistribution/src/apps/filedistributor/filedistributor.cpp new file mode 100644 index 00000000000..d284385784d --- /dev/null +++ b/filedistribution/src/apps/filedistributor/filedistributor.cpp @@ -0,0 +1,401 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <iostream> +#include <string> +#include <cstdlib> + +#include <boost/program_options.hpp> +#include <boost/lambda/bind.hpp> +#include <boost/lambda/lambda.hpp> +#include <boost/thread/thread.hpp> +#include <boost/thread/mutex.hpp> +#include <boost/noncopyable.hpp> +#include <boost/date_time/posix_time/posix_time_types.hpp> +#include <boost/exception/diagnostic_information.hpp> +#include <boost/scope_exit.hpp> + +#include <vespa/fastos/app.h> +#include <vespa/config-zookeepers.h> + +#include <vespa/filedistribution/rpcconfig/config-filedistributorrpc.h> +#include <vespa/filedistribution/distributor/config-filedistributor.h> +#include <vespa/filedistribution/model/config-filereferences.h> + +#include <vespa/filedistribution/distributor/filedistributortrackerimpl.h> +#include <vespa/filedistribution/distributor/filedownloadermanager.h> +#include <vespa/filedistribution/distributor/filedownloader.h> +#include <vespa/filedistribution/distributor/signalhandling.h> +#include <vespa/filedistribution/distributor/state_server_impl.h> + +#include <vespa/filedistribution/model/filedistributionmodelimpl.h> +#include <vespa/filedistribution/rpc/filedistributorrpc.h> +#include <vespa/filedistribution/common/exception.h> +#include <vespa/filedistribution/common/componentsdeleter.h> + +namespace { +const char* programName = "filedistributor"; +} + +#include <vespa/log/log.h> +LOG_SETUP(programName); + +namespace ll = boost::lambda; + +using namespace filedistribution; +using cloud::config::ZookeepersConfig; +using cloud::config::filedistribution::FiledistributorConfig; +using cloud::config::filedistribution::FiledistributorrpcConfig; + +class FileDistributor : public config::IFetcherCallback<ZookeepersConfig>, + public config::IFetcherCallback<FiledistributorConfig>, + public config::IFetcherCallback<FiledistributorrpcConfig>, + public config::IGenerationCallback, + boost::noncopyable +{ + class Components : boost::noncopyable { + ComponentsDeleter _componentsDeleter; + public: + const boost::shared_ptr<ZKFacade> _zk; + const boost::shared_ptr<FileDistributionModelImpl> _model; + const boost::shared_ptr<FileDistributorTrackerImpl> _tracker; + const boost::shared_ptr<FileDownloader> _downloader; + const boost::shared_ptr<FileDownloaderManager> _manager; + const boost::shared_ptr<FileDistributorRPC> _rpcHandler; + const boost::shared_ptr<StateServerImpl> _stateServer; + + private: + boost::thread _downloaderEventLoopThread; + config::ConfigFetcher _configFetcher; + + + template <class T> + typename boost::shared_ptr<T> track(T* component) { + return _componentsDeleter.track(component); + } + + public: + + Components(const boost::shared_ptr<ExceptionRethrower>& exceptionRethrower, + const config::ConfigUri & configUri, + const ZookeepersConfig& zooKeepersConfig, + const FiledistributorConfig& fileDistributorConfig, + const FiledistributorrpcConfig& rpcConfig) + :_zk(track(new ZKFacade(zooKeepersConfig.zookeeperserverlist, exceptionRethrower))), + _model(track(new FileDistributionModelImpl( + fileDistributorConfig.hostname, + fileDistributorConfig.torrentport, + _zk, + exceptionRethrower))), + _tracker(track(new FileDistributorTrackerImpl(_model, exceptionRethrower))), + _downloader(track(new FileDownloader( + _tracker, + fileDistributorConfig.hostname, + fileDistributorConfig.torrentport, + boost::filesystem::path(fileDistributorConfig.filedbpath), + exceptionRethrower))), + _manager(track(new FileDownloaderManager(_downloader, _model))), + _rpcHandler(track(new FileDistributorRPC(rpcConfig.connectionspec, _manager))), + _stateServer(track(new StateServerImpl(fileDistributorConfig.stateport))), + _downloaderEventLoopThread( + ll::bind(&FileDownloader::runEventLoop, _downloader.get())), + _configFetcher(configUri.getContext()) + + { + _manager->start(); + _rpcHandler->start(); + + _tracker->setDownloader(_downloader); + _configFetcher.subscribe<FilereferencesConfig>(configUri.getConfigId(), _model.get()); + _configFetcher.start(); + updatedConfig(_configFetcher.getGeneration()); + } + + void updatedConfig(int64_t generation) { + vespalib::ComponentConfigProducer::Config curr("filedistributor", generation); + _stateServer->myComponents.addConfig(curr); + } + + ~Components() { + _configFetcher.close(); + //Do not waste time retrying zookeeper operations when going down. + _zk->disableRetries(); + + _downloaderEventLoopThread.interrupt(); + _downloaderEventLoopThread.join(); + } + + }; + + typedef boost::lock_guard<boost::mutex> LockGuard; + boost::mutex _configMutex; + + bool _completeReconfigurationNeeded; + std::unique_ptr<ZookeepersConfig> _zooKeepersConfig; + std::unique_ptr<FiledistributorConfig> _fileDistributorConfig; + std::unique_ptr<FiledistributorrpcConfig> _rpcConfig; + + boost::shared_ptr<ExceptionRethrower> _exceptionRethrower; + std::unique_ptr<Components> _components; +public: + FileDistributor() + : _configMutex(), + _completeReconfigurationNeeded(false), + _zooKeepersConfig(), + _fileDistributorConfig(), + _rpcConfig(), + _exceptionRethrower(), + _components() + { } + + void notifyGenerationChange(int64_t generation) { + if (_components && ! completeReconfigurationNeeded()) { + _components->updatedConfig(generation); + } + } + + //configure overrides + void configure(std::unique_ptr<ZookeepersConfig> config) { + LockGuard guard(_configMutex); + _zooKeepersConfig = std::move(config); + _completeReconfigurationNeeded = true; + } + + void configure(std::unique_ptr<FiledistributorConfig> config) { + LockGuard guard(_configMutex); + if (_fileDistributorConfig.get() != NULL && + (config->torrentport != _fileDistributorConfig->torrentport || + config->stateport != _fileDistributorConfig->stateport || + config->hostname != _fileDistributorConfig->hostname || + config->filedbpath != _fileDistributorConfig->filedbpath)) + { + _completeReconfigurationNeeded = true; + } else if (_components.get()) { + configureSpeedLimits(*config); + } + _fileDistributorConfig = std::move(config); + + } + + void configure(std::unique_ptr<FiledistributorrpcConfig> config) { + LockGuard guard(_configMutex); + _rpcConfig = std::move(config); + _completeReconfigurationNeeded = true; + } + + void run(const config::ConfigUri & configUri) { + while (!askedToShutDown()) { + clearReinitializeFlag(); + _exceptionRethrower.reset(new ExceptionRethrower()); + runImpl(configUri); + + if (_exceptionRethrower->exceptionStored()) + _exceptionRethrower->rethrow(); + } + } + + static void ensureExceptionsStored(const boost::shared_ptr<ExceptionRethrower>& exceptionRethrower) { + //TODO: this is somewhat hackish, refactor to eliminate this later. + LOG(debug, "Waiting for shutdown"); + for (int i=0; + i<50 && !exceptionRethrower.unique(); + ++i) { + boost::this_thread::sleep(boost::posix_time::milliseconds(100)); + } + LOG(debug, "Done waiting for shutdown"); + + if (!exceptionRethrower.unique()) { + EV_STOPPING(programName, "Forced termination"); + kill(getpid(), SIGKILL); + } + } + + void createComponents(const boost::shared_ptr<ExceptionRethrower>& exceptionRethrower, const config::ConfigUri & configUri) { + LockGuard guard(_configMutex); + _components.reset( + new Components(exceptionRethrower, + configUri, + *_zooKeepersConfig, + *_fileDistributorConfig, + *_rpcConfig)); + + configureSpeedLimits(*_fileDistributorConfig); + _completeReconfigurationNeeded = false; + } + + bool completeReconfigurationNeeded() { + LockGuard guard(_configMutex); + if (_completeReconfigurationNeeded) { + LOG(debug, "Complete reconfiguration needed"); + } + return _completeReconfigurationNeeded; + } + + void configureSpeedLimits(const FiledistributorConfig& config) { + FileDownloader& downloader = *_components->_downloader; + downloader.setMaxDownloadSpeed(config.maxdownloadspeed); + downloader.setMaxUploadSpeed(config.maxuploadspeed); + } + + //avoid warning due to scope exit macro +#pragma GCC diagnostic ignored "-Wshadow" + void runImpl(const config::ConfigUri & configUri) { + + BOOST_SCOPE_EXIT((&_components)(&_exceptionRethrower)) { + _components.reset(); + //Ensures that any exception stored during destruction will be available when returning. + ensureExceptionsStored(_exceptionRethrower); + } BOOST_SCOPE_EXIT_END + + createComponents(_exceptionRethrower, configUri); + + // We do not want back to back reinitializing as it gives zero time for serving + // some torrents. + int postPoneAskedToReinitializedSecs = 50; + + while (!askedToShutDown() && + (postPoneAskedToReinitializedSecs > 0 || !askedToReinitialize()) && + !completeReconfigurationNeeded() && + !_exceptionRethrower->exceptionStored()) { + postPoneAskedToReinitializedSecs--; + boost::this_thread::sleep(boost::posix_time::seconds(1)); + } + } +}; + +//TODO: use pop in gcc 4.6 +#pragma GCC diagnostic warning "-Wshadow" + +class FileDistributorApplication : public FastOS_Application { + const config::ConfigUri _configUri; +public: + FileDistributorApplication(const config::ConfigUri & configUri); + + //overrides + int Main(); +}; + +namespace { + +struct ProgramOptionException { + std::string _msg; + ProgramOptionException(const std::string & msg) + : _msg(msg) + {} +}; + +bool exists(const std::string& optionName, const boost::program_options::variables_map& map) { + return map.find(optionName) != map.end(); +} + +void ensureExists(const std::string& optionName, const boost::program_options::variables_map& map \ + ) { + if (!exists(optionName, map)) { + throw ProgramOptionException("Error: Missing option " + optionName); + } +} + +} //anonymous namespace + +FileDistributorApplication::FileDistributorApplication(const config::ConfigUri & configUri) + :_configUri(configUri) { +} + +int +FileDistributorApplication::Main() { + try { + FileDistributor distributor; + + config::ConfigFetcher configFetcher(_configUri.getContext()); + configFetcher.subscribe<ZookeepersConfig>(_configUri.getConfigId(), &distributor); + configFetcher.subscribe<FiledistributorConfig>(_configUri.getConfigId(), &distributor); + configFetcher.subscribe<FiledistributorrpcConfig>(_configUri.getConfigId(), &distributor); + configFetcher.subscribeGenerationChanges(&distributor); + configFetcher.start(); + + distributor.run(_configUri); + + EV_STOPPING(programName, "Clean exit"); + return 0; + } catch(const FileDoesNotExistException & e) { + std::string s = boost::diagnostic_information(e); + EV_STOPPING(programName, s.c_str()); + return -1; + } catch(const ZKNodeDoesNotExistsException & e) { + std::string s = boost::diagnostic_information(e); + EV_STOPPING(programName, s.c_str()); + return -2; + } catch(const ZKGenericException & e) { + std::string s = boost::diagnostic_information(e); + EV_STOPPING(programName, s.c_str()); + return -3; + } catch(const boost::unknown_exception & e) { + std::string s = boost::diagnostic_information(e); + LOG(warning, "Caught '%s'", s.c_str()); + EV_STOPPING(programName, s.c_str()); + return -4; +#if 0 + } catch(const boost::exception& e) { + std::string s = boost::diagnostic_information(e); + LOG(error, "Caught '%s'", s.c_str()); + EV_STOPPING(programName, s.c_str()); + return -1; + } catch(const std::string& msg) { + std::string s = "Error: " + msg; + LOG(error, "Caught '%s'", s.c_str()); + EV_STOPPING(programName, s.c_str()); + return -1; +#endif + } +} + +int +executeApplication(int argc, char** argv) { + const char + *configId("configid"), + *help("help"); + + namespace po = boost::program_options; + po::options_description description; + description.add_options() + (configId, po::value<std::string > (), "id to request config for") + (help, "help"); + + try { + po::variables_map values; + po::store( + po::parse_command_line(argc, argv, description), + values); + + if (exists(help, values)) { + std::cout <<description; + return 0; + } + ensureExists(configId, values); + + FileDistributorApplication application( + values[configId].as<std::string > ()); + return application.Entry(argc, argv); + + } catch(ProgramOptionException& e) { + std::cerr <<e._msg <<std::endl; + return -1; + } +} + +void terminate() { + std::cerr <<"Terminate called: " <<std::endl; + Backtrace backtrace; + std::cerr <<backtrace <<std::endl; +} + +int +main(int argc, char** argv) { + EV_STARTED(programName); + initSignals(); + + std::srand(std::time(0)); + std::set_terminate(terminate); + filedistribution::setupZooKeeperLogging(); + + return executeApplication(argc, argv); +} diff --git a/filedistribution/src/apps/status/.gitignore b/filedistribution/src/apps/status/.gitignore new file mode 100644 index 00000000000..6dc1c1fff5d --- /dev/null +++ b/filedistribution/src/apps/status/.gitignore @@ -0,0 +1,2 @@ +/status-filedistribution +filedistribution_status-filedistribution_app diff --git a/filedistribution/src/apps/status/CMakeLists.txt b/filedistribution/src/apps/status/CMakeLists.txt new file mode 100644 index 00000000000..fb0b76af55b --- /dev/null +++ b/filedistribution/src/apps/status/CMakeLists.txt @@ -0,0 +1,16 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(filedistribution_status-filedistribution_app + SOURCES + status-filedistribution.cpp + INSTALL bin + DEPENDS + filedistribution_filedistributionmodel + filedistribution_common +) +target_compile_options(filedistribution_status-filedistribution_app PRIVATE -DTORRENT_DEBUG -DTORRENT_DISABLE_ENCRYPTION -DTORRENT_DISABLE_DHT -DWITH_SHIPPED_GEOIP_H -DBOOST_ASIO_HASH_MAP_BUCKETS=1021 -DBOOST_EXCEPTION_DISABLE -DBOOST_ASIO_ENABLE_CANCELIO -DBOOST_ASIO_DYN_LINK -DTORRENT_LINKING_SHARED) +vespa_add_target_system_dependency(filedistribution_status-filedistribution_app boost boost_system-mt-d) +vespa_add_target_system_dependency(filedistribution_status-filedistribution_app boost boost_thread-mt-d) +vespa_add_target_system_dependency(filedistribution_status-filedistribution_app boost boost_program_options-mt-d) +vespa_add_target_system_dependency(filedistribution_status-filedistribution_app boost boost_filesystem-mt-d) +vespa_add_target_system_dependency(filedistribution_status-filedistribution_app boost boost_unit_test_framework-mt-d) +vespa_install_script(vespa-status-filedistribution.sh vespa-status-filedistribution bin) diff --git a/filedistribution/src/apps/status/status-filedistribution.cpp b/filedistribution/src/apps/status/status-filedistribution.cpp new file mode 100644 index 00000000000..90c21623016 --- /dev/null +++ b/filedistribution/src/apps/status/status-filedistribution.cpp @@ -0,0 +1,185 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +LOG_SETUP("status-filedistribution"); + +#include <iostream> +#include <map> + +#include <boost/program_options.hpp> +#include <boost/foreach.hpp> +#include <boost/thread.hpp> + +#include <vespa/filedistribution/common/exceptionrethrower.h> +#include <vespa/filedistribution/model/zkfacade.h> +#include <vespa/filedistribution/model/filedistributionmodel.h> +#include <vespa/filedistribution/model/filedistributionmodelimpl.h> +#include <zookeeper/zookeeper.h> + +using namespace filedistribution; + +std::string +plural(size_t size) +{ + if (size == 1) + return ""; + else + return "s"; +} + +template <class CONT> +std::string +plural(const CONT& cont) +{ + size_t size = cont.size(); + return plural(size); +} + +typedef FileDBModel::HostStatus HostStatus; +typedef std::map<std::string, HostStatus> StatusByHostName; + +void +printWaitingForHosts(const StatusByHostName& notFinishedHosts) +{ + std::cout <<"Waiting for the following host" <<plural(notFinishedHosts) <<":" <<std::endl; + BOOST_FOREACH(const StatusByHostName::value_type hostNameAndStatus, notFinishedHosts) { + std::cout <<hostNameAndStatus.first <<" ("; + + const HostStatus& hostStatus = hostNameAndStatus.second; + if (hostStatus._state == HostStatus::notStarted) { + std::cout <<"Not started"; + } else { + std::cout <<"Downloading, " + <<hostStatus._numFilesFinished <<"/" <<hostStatus._numFilesToDownload + <<" file" <<plural(hostStatus._numFilesToDownload) <<" completed"; + } + std::cout <<")" <<std::endl; + } +} + +//TODO:refactor +int printStatus(const std::string& zkservers) +{ + boost::shared_ptr<ExceptionRethrower> exceptionRethrower; + boost::shared_ptr<ZKFacade> zk(new ZKFacade(zkservers, exceptionRethrower)); + + boost::shared_ptr<FileDBModel> model(new ZKFileDBModel(zk)); + + std::vector<std::string> hosts = model->getHosts(); + + StatusByHostName notFinishedHosts; + StatusByHostName finishedHosts; + bool hasStarted = false; + + BOOST_FOREACH(std::string host, hosts) { + HostStatus hostStatus = model->getHostStatus(host); + switch (hostStatus._state) { + case HostStatus::finished: + hasStarted = true; + finishedHosts[host] = hostStatus; + break; + case HostStatus::inProgress: + hasStarted = true; + case HostStatus::notStarted: + notFinishedHosts[host] = hostStatus; + break; + } + } + + if (notFinishedHosts.empty()) { + std::cout <<"Finished distributing files to all hosts." <<std::endl; + return 0; + } else if (!hasStarted) { + std::cout <<"File distribution has not yet started." <<std::endl; + return 0; + } else { + printWaitingForHosts(notFinishedHosts); + return 5; + } +} + +int +printStatusRetryIfZKProblem(const std::string& zkservers, const std::string& zkLogFile) +{ + FILE* file = fopen(zkLogFile.c_str(), "w"); + if (file == NULL) { + std::cerr <<"Could not open file " <<zkLogFile <<std::endl; + } else { + zoo_set_log_stream(file); + } + zoo_set_debug_level(ZOO_LOG_LEVEL_ERROR); + + for (int i = 0; i < 5; ++i) { + try { + return printStatus(zkservers); + } catch (ZKNodeDoesNotExistsException& e) { + LOG(debug, "Node does not exists, assuming concurrent update. %s", boost::diagnostic_information(e).c_str()); + + } catch (ZKSessionExpired& e) { + LOG(debug, "Session expired."); + } + boost::this_thread::sleep(boost::posix_time::milliseconds(500)); + } + return 4; +} + + +//TODO: move to common +struct ProgramOptionException { + std::string _msg; + ProgramOptionException(const std::string & msg) + : _msg(msg) + {} +}; + +bool exists(const std::string& optionName, const boost::program_options::variables_map& map) { + return map.find(optionName) != map.end(); +} + +void ensureExists(const std::string& optionName, const boost::program_options::variables_map& map \ + ) { + if (!exists(optionName, map)) { + throw ProgramOptionException("Error: Missing option " + optionName); + } +} +//END: move to common + + +int +main(int argc, char** argv) { + const char + *zkstring = "zkstring", + *zkLogFile = "zkLogFile", + *help = "help"; + + namespace po = boost::program_options; + boost::program_options::options_description description; + description.add_options() + (zkstring, po::value<std::string > (), "The zookeeper servers to connect to, separated by comma") + (zkLogFile, po::value<std::string >() -> default_value("/dev/null"), "Zookeeper log file") + (help, "help"); + + try { + boost::program_options::variables_map values; + po::store( + boost::program_options::parse_command_line(argc, argv, description), + values); + + if (exists(help, values)) { + std::cout <<description; + return 0; + } + + ensureExists(zkstring, values); + + return printStatusRetryIfZKProblem( + values[zkstring] .as<std::string>(), + values[zkLogFile].as<std::string>()); + } catch (const ProgramOptionException& e) { + std::cerr <<e._msg <<std::endl; + return 3; + } catch(const std::exception& e) { + std::cerr <<"Error: " <<e.what() <<std::endl; + return 3; + } +} diff --git a/filedistribution/src/apps/status/vespa-status-filedistribution.sh b/filedistribution/src/apps/status/vespa-status-filedistribution.sh new file mode 100644 index 00000000000..477e4f6bff5 --- /dev/null +++ b/filedistribution/src/apps/status/vespa-status-filedistribution.sh @@ -0,0 +1,68 @@ +#!/bin/sh +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +# BEGIN environment bootstrap section +# Do not edit between here and END as this section should stay identical in all scripts + +findpath () { + myname=${0} + mypath=${myname%/*} + myname=${myname##*/} + if [ "$mypath" ] && [ -d "$mypath" ]; then + return + fi + mypath=$(pwd) + if [ -f "${mypath}/${myname}" ]; then + return + fi + echo "FATAL: Could not figure out the path where $myname lives from $0" + exit 1 +} + +COMMON_ENV=libexec/vespa/common-env.sh + +source_common_env () { + if [ "$VESPA_HOME" ] && [ -d "$VESPA_HOME" ]; then + # ensure it ends with "/" : + VESPA_HOME=${VESPA_HOME%/}/ + export VESPA_HOME + common_env=$VESPA_HOME/$COMMON_ENV + if [ -f "$common_env" ]; then + . $common_env + return + fi + fi + return 1 +} + +findroot () { + source_common_env && return + if [ "$VESPA_HOME" ]; then + echo "FATAL: bad VESPA_HOME value '$VESPA_HOME'" + exit 1 + fi + if [ "$ROOT" ] && [ -d "$ROOT" ]; then + VESPA_HOME="$ROOT" + source_common_env && return + fi + findpath + while [ "$mypath" ]; do + VESPA_HOME=${mypath} + source_common_env && return + mypath=${mypath%/*} + done + echo "FATAL: missing VESPA_HOME environment variable" + echo "Could not locate $COMMON_ENV anywhere" + exit 1 +} + +findroot + +# END environment bootstrap section + +ROOT=$VESPA_HOME + +ZKSTRING=$($ROOT/libexec/vespa/vespa-config.pl -zkstring) +test -z "$VESPA_LOG_LEVEL" && VESPA_LOG_LEVEL=warning +export VESPA_LOG_LEVEL +exec $ROOT/libexec/vespa/status-filedistribution --zkstring "$ZKSTRING" $@ diff --git a/filedistribution/src/tests/.gitignore b/filedistribution/src/tests/.gitignore new file mode 100644 index 00000000000..ee50e13466c --- /dev/null +++ b/filedistribution/src/tests/.gitignore @@ -0,0 +1,2 @@ +Makefile +*_test diff --git a/filedistribution/src/tests/common/.gitignore b/filedistribution/src/tests/common/.gitignore new file mode 100644 index 00000000000..060721ea295 --- /dev/null +++ b/filedistribution/src/tests/common/.gitignore @@ -0,0 +1 @@ +filedistribution_common_test_app diff --git a/filedistribution/src/tests/common/CMakeLists.txt b/filedistribution/src/tests/common/CMakeLists.txt new file mode 100644 index 00000000000..b9670848fbc --- /dev/null +++ b/filedistribution/src/tests/common/CMakeLists.txt @@ -0,0 +1,13 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(filedistribution_common_test_app + SOURCES + testCommon.cpp + DEPENDS +) +target_compile_options(filedistribution_common_test_app PRIVATE -DTORRENT_DEBUG -DTORRENT_DISABLE_ENCRYPTION -DTORRENT_DISABLE_DHT -DWITH_SHIPPED_GEOIP_H -DBOOST_ASIO_HASH_MAP_BUCKETS=1021 -DBOOST_EXCEPTION_DISABLE -DBOOST_ASIO_ENABLE_CANCELIO -DBOOST_ASIO_DYN_LINK -DTORRENT_LINKING_SHARED) +vespa_add_target_system_dependency(filedistribution_common_test_app boost boost_system-mt-d) +vespa_add_target_system_dependency(filedistribution_common_test_app boost boost_thread-mt-d) +vespa_add_target_system_dependency(filedistribution_common_test_app boost boost_program_options-mt-d) +vespa_add_target_system_dependency(filedistribution_common_test_app boost boost_filesystem-mt-d) +vespa_add_target_system_dependency(filedistribution_common_test_app boost boost_unit_test_framework-mt-d) +vespa_add_test(NAME filedistribution_common_test_app NO_VALGRIND COMMAND filedistribution_common_test_app) diff --git a/filedistribution/src/tests/common/testCommon.cpp b/filedistribution/src/tests/common/testCommon.cpp new file mode 100644 index 00000000000..699d3628547 --- /dev/null +++ b/filedistribution/src/tests/common/testCommon.cpp @@ -0,0 +1,42 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#define BOOST_TEST_DYN_LINK +#define BOOST_TEST_MAIN + +#include <vespa/fastos/fastos.h> +#include <vespa/filedistribution/common/buffer.h> + +#include <boost/test/unit_test.hpp> +#include <string> + +namespace fd = filedistribution; + +const size_t bufferCapacity = 10; + +fd::Move<fd::Buffer> +getBuffer() { + const char* test = "test"; + fd::Buffer buffer(test, test + strlen(test)); + buffer.reserve(bufferCapacity); + buffer.push_back(0); + return fd::move(buffer); +} + +BOOST_AUTO_TEST_CASE(bufferTest) { + fd::Buffer buffer(getBuffer()); + BOOST_CHECK(buffer.begin() != 0); + BOOST_CHECK_EQUAL(bufferCapacity, buffer.capacity()); + BOOST_CHECK_EQUAL(5u, buffer.size()); + BOOST_CHECK_EQUAL(std::string("test"), buffer.begin()); +} + +struct Callback { + bool* _called; + Callback(bool *called) + :_called(called) + {} + + void operator()(const std::string& str) { + BOOST_CHECK_EQUAL("abcd", str); + *_called = true; + } +}; diff --git a/filedistribution/src/tests/filedbmodelimpl/.gitignore b/filedistribution/src/tests/filedbmodelimpl/.gitignore new file mode 100644 index 00000000000..05a25df1258 --- /dev/null +++ b/filedistribution/src/tests/filedbmodelimpl/.gitignore @@ -0,0 +1 @@ +filedistribution_filedbmodelimpl_test_app diff --git a/filedistribution/src/tests/filedbmodelimpl/CMakeLists.txt b/filedistribution/src/tests/filedbmodelimpl/CMakeLists.txt new file mode 100644 index 00000000000..9bd7bd14ebb --- /dev/null +++ b/filedistribution/src/tests/filedbmodelimpl/CMakeLists.txt @@ -0,0 +1,16 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(filedistribution_filedbmodelimpl_test_app + SOURCES + test-filedistributionmodelimpl.cpp + DEPENDS + filedistribution_filedistributionmodel + filedistribution_common + filedistribution_mocks +) +target_compile_options(filedistribution_filedbmodelimpl_test_app PRIVATE -DTORRENT_DEBUG -DTORRENT_DISABLE_ENCRYPTION -DTORRENT_DISABLE_DHT -DWITH_SHIPPED_GEOIP_H -DBOOST_ASIO_HASH_MAP_BUCKETS=1021 -DBOOST_EXCEPTION_DISABLE -DBOOST_ASIO_ENABLE_CANCELIO -DBOOST_ASIO_DYN_LINK -DTORRENT_LINKING_SHARED) +vespa_add_target_system_dependency(filedistribution_filedbmodelimpl_test_app boost boost_system-mt-d) +vespa_add_target_system_dependency(filedistribution_filedbmodelimpl_test_app boost boost_thread-mt-d) +vespa_add_target_system_dependency(filedistribution_filedbmodelimpl_test_app boost boost_program_options-mt-d) +vespa_add_target_system_dependency(filedistribution_filedbmodelimpl_test_app boost boost_filesystem-mt-d) +vespa_add_target_system_dependency(filedistribution_filedbmodelimpl_test_app boost boost_unit_test_framework-mt-d) +vespa_add_test(NAME filedistribution_filedbmodelimpl_test_app NO_VALGRIND COMMAND filedistribution_filedbmodelimpl_test_app) diff --git a/filedistribution/src/tests/filedbmodelimpl/test-filedistributionmodelimpl.cpp b/filedistribution/src/tests/filedbmodelimpl/test-filedistributionmodelimpl.cpp new file mode 100644 index 00000000000..84d1b5b958c --- /dev/null +++ b/filedistribution/src/tests/filedbmodelimpl/test-filedistributionmodelimpl.cpp @@ -0,0 +1,54 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#define BOOST_TEST_DYN_LINK +#define BOOST_TEST_MAIN +#define BOOST_TEST_MODULE filedbmodelimpl test +#include <vespa/fastos/fastos.h> +#include <boost/test/unit_test.hpp> + +#include <iostream> +#include <vector> +#include <vespa/filedistribution/common/componentsdeleter.h> +#include <vespa/filedistribution/model/filedistributionmodelimpl.h> +#include <vespa/filedistribution/model/zkfacade.h> +#include <zookeeper/zookeeper.h> + + +using namespace filedistribution; + + +namespace { + + +struct Fixture { + boost::shared_ptr<ExceptionRethrower> _exceptionRethrower; + ComponentsDeleter _componentsDeleter; + boost::shared_ptr<ZKFacade> _zk; + boost::shared_ptr<FileDistributionModelImpl> _distModel; + Fixture() { + _exceptionRethrower.reset(new ExceptionRethrower()); + _zk = _componentsDeleter.track(new ZKFacade("test1-tonyv:2181", _exceptionRethrower)); + _distModel.reset(new FileDistributionModelImpl("hostname", 12345, _zk, _exceptionRethrower)); + } + ~Fixture() { } +}; + +} //anonymous namespace + + +BOOST_FIXTURE_TEST_SUITE(FileDistributionModelImplTests, Fixture) + +BOOST_AUTO_TEST_CASE(configServersAsPeers) +{ + std::vector<std::string> peers; + peers.push_back("old"); + peers.push_back("config:123"); + peers.push_back("config:567"); + peers.push_back("foo:123"); + _distModel->addConfigServersAsPeers(peers, "config,configTwo", 123); + BOOST_CHECK(peers.size() == 5); + BOOST_CHECK(peers[4] == "configTwo:123"); + _distModel->addConfigServersAsPeers(peers, NULL, 123); + BOOST_CHECK(peers.size() == 5); +} + +BOOST_AUTO_TEST_SUITE_END() diff --git a/filedistribution/src/tests/filedownloader/.gitignore b/filedistribution/src/tests/filedownloader/.gitignore new file mode 100644 index 00000000000..f7bb72b4791 --- /dev/null +++ b/filedistribution/src/tests/filedownloader/.gitignore @@ -0,0 +1 @@ +filedistribution_filedownloader_test_app diff --git a/filedistribution/src/tests/filedownloader/CMakeLists.txt b/filedistribution/src/tests/filedownloader/CMakeLists.txt new file mode 100644 index 00000000000..6be9b28651d --- /dev/null +++ b/filedistribution/src/tests/filedownloader/CMakeLists.txt @@ -0,0 +1,15 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(filedistribution_filedownloader_test_app + SOURCES + testfiledownloader.cpp + DEPENDS + filedistribution_filedistributionmodel + filedistribution_common +) +target_compile_options(filedistribution_filedownloader_test_app PRIVATE -DTORRENT_DEBUG -DTORRENT_DISABLE_ENCRYPTION -DTORRENT_DISABLE_DHT -DWITH_SHIPPED_GEOIP_H -DBOOST_ASIO_HASH_MAP_BUCKETS=1021 -DBOOST_EXCEPTION_DISABLE -DBOOST_ASIO_ENABLE_CANCELIO -DBOOST_ASIO_DYN_LINK -DTORRENT_LINKING_SHARED) +vespa_add_target_system_dependency(filedistribution_filedownloader_test_app boost boost_system-mt-d) +vespa_add_target_system_dependency(filedistribution_filedownloader_test_app boost boost_thread-mt-d) +vespa_add_target_system_dependency(filedistribution_filedownloader_test_app boost boost_program_options-mt-d) +vespa_add_target_system_dependency(filedistribution_filedownloader_test_app boost boost_filesystem-mt-d) +vespa_add_target_system_dependency(filedistribution_filedownloader_test_app boost boost_unit_test_framework-mt-d) +vespa_add_test(NAME filedistribution_filedownloader_test_app NO_VALGRIND COMMAND filedistribution_filedownloader_test_app) diff --git a/filedistribution/src/tests/filedownloader/testfiledownloader.cpp b/filedistribution/src/tests/filedownloader/testfiledownloader.cpp new file mode 100644 index 00000000000..94b505f4f9f --- /dev/null +++ b/filedistribution/src/tests/filedownloader/testfiledownloader.cpp @@ -0,0 +1,158 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#define BOOST_TEST_DYN_LINK +#define BOOST_TEST_MAIN + +#include <vespa/fastos/fastos.h> +#include <vespa/filedistribution/distributor/filedownloader.h> +#include <vespa/filedistribution/distributor/filedistributortrackerimpl.h> + +#include <fstream> + +#include <boost/test/unit_test.hpp> +#include <boost/filesystem.hpp> +#include <boost/thread.hpp> +#include <boost/lambda/bind.hpp> +#include <boost/filesystem/fstream.hpp> + +#include <libtorrent/session.hpp> +#include <libtorrent/tracker_manager.hpp> +#include <libtorrent/torrent.hpp> + +#include <vespa/filedistribution/manager/createtorrent.h> +#include <vespa/filedistribution/model/filedistributionmodel.h> +#include <vespa/filedistribution/common/exceptionrethrower.h> +#include <vespa/filedistribution/common/componentsdeleter.h> + +namespace fs = boost::filesystem; + +using namespace filedistribution; + +namespace { +const std::string localHost("localhost"); +const int uploaderPort = 9113; +const int downloaderPort = 9112; + +#if 0 +boost::shared_ptr<FileDownloader> +createDownloader(ComponentsDeleter& deleter, + int port, const fs::path& downloaderPath, + const boost::shared_ptr<FileDistributionModel>& model, + const boost::shared_ptr<ExceptionRethrower>& exceptionRethrower) +{ + boost::shared_ptr<FileDistributorTrackerImpl> tracker(deleter.track(new FileDistributorTrackerImpl(model, exceptionRethrower))); + boost::shared_ptr<FileDownloader> downloader(deleter.track(new FileDownloader(tracker, + localHost, port, downloaderPath, exceptionRethrower))); + + tracker->setDownloader(downloader); + return downloader; +} +#endif + +} //anonymous namespace + +class MockFileDistributionModel : public FileDistributionModel { + virtual FileDBModel& getFileDBModel() { + abort(); + } + + virtual std::set<std::string> getFilesToDownload() { + return std::set<std::string>(); + } + + virtual PeerEntries getPeers(const std::string& , size_t) { + PeerEntries peers(2); + peers[0].ip = localHost; + peers[0].port = uploaderPort; + + peers[1].ip = localHost; + peers[1].port = downloaderPort; + + return peers; + } + + virtual void addPeer(const std::string&) {} + virtual void removePeer(const std::string&) {} + virtual void peerFinished(const std::string&) {} +}; + + +#if 0 +BOOST_AUTO_TEST_CASE(fileDownloaderTest) { + fs::path testPath = "/tmp/filedownloadertest"; + fs::remove_all(testPath); + + fs::path downloaderPath = testPath / "downloader"; + fs::path uploaderPath = testPath / "uploader"; + + const std::string fileReference = "0123456789012345678901234567890123456789"; + const std::string fileToSend = "filetosend.txt"; + + fs::create_directories(downloaderPath); + fs::create_directories(uploaderPath / fileReference); + + fs::path fileToUploadPath = uploaderPath / fileReference / "filetosend.txt"; + + { + fs::ofstream stream(fileToUploadPath); + stream <<"Hello, world!" <<std::endl; + } + + CreateTorrent createTorrent(fileToUploadPath); + Buffer buffer(createTorrent.bencode()); + + ComponentsDeleter deleter; + boost::shared_ptr<ExceptionRethrower> exceptionRethrower(new ExceptionRethrower()); + + boost::shared_ptr<FileDistributionModel> model(deleter.track(new MockFileDistributionModel())); + boost::shared_ptr<FileDownloader> downloader = + createDownloader(deleter, downloaderPort, downloaderPath, model, exceptionRethrower); + + boost::shared_ptr<FileDownloader> uploader = + createDownloader(deleter, uploaderPort, uploaderPath, model, exceptionRethrower); + + boost::thread uploaderThread( + boost::lambda::bind(&FileDownloader::runEventLoop, uploader.get())); + + boost::thread downloaderThread( + boost::lambda::bind(&FileDownloader::runEventLoop, downloader.get())); + + uploader->addTorrent(fileReference, buffer); + downloader->addTorrent(fileReference, buffer); + + sleep(5); + BOOST_CHECK(fs::exists(downloaderPath / fileReference / fileToSend)); + BOOST_CHECK(!exceptionRethrower->exceptionStored()); + + uploaderThread.interrupt(); + uploaderThread.join(); + + downloaderThread.interrupt(); + downloaderThread.join(); + + fs::remove_all(testPath); +} +#endif + +//TODO: cleanup +libtorrent::sha1_hash +toInfoHash(const std::string& fileReference) { + assert (fileReference.size() == 40); + std::istringstream s(fileReference); + + libtorrent::sha1_hash infoHash; + s >> infoHash; + return infoHash; +} + +BOOST_AUTO_TEST_CASE(test_filereference_infohash_conversion) { + const std::string fileReference = "3a281c905c9b6ebe4d969037a198454fedefbdf3"; + + libtorrent::sha1_hash infoHash = toInfoHash(fileReference); + + std::ostringstream fileReferenceString; + fileReferenceString <<infoHash; + + BOOST_CHECK(fileReference == fileReferenceString.str()); + + std::cout <<fileReference <<std::endl <<fileReferenceString.str() <<std::endl; +} diff --git a/filedistribution/src/tests/lib/CMakeLists.txt b/filedistribution/src/tests/lib/CMakeLists.txt new file mode 100644 index 00000000000..2c710f315e8 --- /dev/null +++ b/filedistribution/src/tests/lib/CMakeLists.txt @@ -0,0 +1,6 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_library(filedistribution_mocks STATIC + SOURCES + mock-zookeeper.cpp + DEPENDS +) diff --git a/filedistribution/src/tests/lib/mock-zookeeper.cpp b/filedistribution/src/tests/lib/mock-zookeeper.cpp new file mode 100644 index 00000000000..82cd03a268e --- /dev/null +++ b/filedistribution/src/tests/lib/mock-zookeeper.cpp @@ -0,0 +1,326 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <zookeeper/zookeeper.h> + +#include <string> +#include <map> +#include <cassert> +#include <cstring> +#include <vector> + +#include <boost/thread.hpp> +#include <boost/lexical_cast.hpp> + +#include <iostream> + +#include <vespa/filedistribution/common/concurrentqueue.h> + +using std::map; +using std::string; +using std::vector; +using std::pair; +using std::make_pair; +using filedistribution::ConcurrentQueue; + +namespace { +std::pair<string, string> parentPathAndChildName(const string& childPath) +{ + if (childPath.empty()) { + return std::make_pair("", ""); + } else { + assert (childPath[0] == '/'); + + size_t index = childPath.find_last_of("/"); + return std::make_pair(childPath.substr(0, index), childPath.substr(index + 1)); + } +} + +struct Node { + typedef map<string, Node> Children; + Children children; + bool exists; + bool ephemeral; + vector<char> buffer; + vector<pair<watcher_fn, void*> > watchers; + + Node() + :exists(false), + ephemeral(false) + {} + + void addWatcher(watcher_fn fn, void* context) { + if (fn) + watchers.push_back(make_pair(fn, context)); + } + + void triggerWatches(zhandle_t* zh, const std::string& path); +}; + +boost::shared_ptr<Node> sharedRoot; + +struct ZHandle { + struct Worker { + ZHandle& zhandle; + + Worker(ZHandle* parent) : zhandle(*parent) {} + + void operator()(); + }; + + int sequence; + + boost::shared_ptr<Node> root; + boost::thread _watchersThread; + vector<string> ephemeralNodes; + + typedef boost::function<void (void)> InvokeWatcherFun; + ConcurrentQueue<InvokeWatcherFun> watcherInvocations; + + Node& getNode(const string& path); + + Node& getParent(const string& path); + + void ephemeralNode(const string&path) { + ephemeralNodes.push_back(path); + } + + ZHandle() : sequence(0), _watchersThread(Worker(this)) { + if (!sharedRoot) + sharedRoot.reset(new Node()); + + root = sharedRoot; + } + + ~ZHandle() { + std::for_each(ephemeralNodes.begin(), ephemeralNodes.end(), + boost::bind(&zoo_delete, (zhandle_t*)this, + boost::bind(&string::c_str, _1), + 0)); + + _watchersThread.interrupt(); + _watchersThread.join(); + } +}; + +void +ZHandle::Worker::operator()() +{ + while (!boost::this_thread::interruption_requested()) { + InvokeWatcherFun fun = zhandle.watcherInvocations.pop(); + boost::this_thread::disable_interruption di; + fun(); + } +} + +Node& ZHandle::getNode(const string& path) { + auto splittedPath = parentPathAndChildName(path); + if (splittedPath.second.empty()) { + return *root; + } else { + return getNode(splittedPath.first).children[splittedPath.second]; + } +} + +Node& +ZHandle::getParent(const string& childPath) +{ + auto splittedPath = parentPathAndChildName(childPath); + if (splittedPath.second.empty()) { + throw "Can't get parent of root."; + } else { + return getNode(splittedPath.first); + } +} + +void +Node::triggerWatches(zhandle_t* zh, const std::string& path) { + for (auto i = watchers.begin(); i != watchers.end(); ++i) { + ((ZHandle*)zh)->watcherInvocations.push(boost::bind(i->first, zh, \ + /*TODO: type, state*/ 0, 0, + boost::bind(&string::c_str, path), + i->second)); + } + watchers.clear(); +} + +} //anonymous namespace + +extern "C" { + +ZOOAPI void zoo_set_debug_level(ZooLogLevel) {} +ZOOAPI zhandle_t *zookeeper_init(const char * host, watcher_fn fn, + int recv_timeout, const clientid_t *clientid, void *context, int flags) +{ + (void)host; + (void)fn; + (void)recv_timeout; + (void)clientid; + (void)context; + (void)flags; + + return (zhandle_t*)new ZHandle; +} + +ZOOAPI int zookeeper_close(zhandle_t *zh) +{ + delete (ZHandle*)zh; + return 0; +} + +ZOOAPI int zoo_create(zhandle_t *zh, const char *pathOrPrefix, const char *value, + int valuelen, const struct ACL_vector *, int flags, + char *path_buffer, int path_buffer_len) +{ + std::string path = pathOrPrefix; + if (flags & ZOO_SEQUENCE) + path += boost::lexical_cast<std::string>(((ZHandle*)zh)->sequence++); + + strncpy(path_buffer, path.c_str(), path_buffer_len); + Node& node = ((ZHandle*)zh)->getNode(path); + node.exists = true; + + if (flags & ZOO_EPHEMERAL) + ((ZHandle*)zh)->ephemeralNode(path); + + node.buffer.resize(valuelen); + std::copy(value, value + valuelen, node.buffer.begin()); + + + node.triggerWatches(zh, path); + ((ZHandle*)zh)->getParent(path).triggerWatches(zh, + parentPathAndChildName(path).first); + + return 0; +} + + +ZOOAPI int zoo_set(zhandle_t *zh, const char *path, const char *buffer, + int buflen, int version) { + (void)version; + + Node& node = ((ZHandle*)zh)->getNode(path); + if (!node.exists) + return ZNONODE; + + + node.buffer.resize(buflen); + std::copy(buffer, buffer + buflen, node.buffer.begin()); + + node.triggerWatches(zh, path); + return 0; +} + + + +ZOOAPI int zoo_get_children(zhandle_t *zh, const char *path, int watch, + struct String_vector *strings) +{ + (void)watch; + return zoo_wget_children(zh, path, + 0, 0, + strings); +} + +ZOOAPI int zoo_wget_children(zhandle_t *zh, const char *path, + watcher_fn watcher, void* watcherCtx, + struct String_vector *strings) +{ + Node& node = ((ZHandle*)zh)->getNode(path); + strings->count = node.children.size(); + strings->data = new char*[strings->count]; + + int index = 0; + for (auto i = node.children.begin(); i != node.children.end(); ++i) { + strings->data[index] = new char[i->first.length() + 1]; + std::strcpy(strings->data[index], &*i->first.begin()); + ++index; + } + + node.addWatcher(watcher, watcherCtx); + + return 0; +} + + + + +ZOOAPI int zoo_delete(zhandle_t *zh, const char *path, int version) +{ + (void)version; + + std::string pathStr = path; + int index = pathStr.find_last_of("/"); + + if (pathStr.length() == 1) + throw "Can't delete root"; + + Node& parent = ((ZHandle*)zh)->getNode(pathStr.substr(0, index)); + parent.children.erase(pathStr.substr(index + 1)); + + ((ZHandle*)zh)->getParent(path).triggerWatches(zh, + parentPathAndChildName(path).first); + + return 0; +} + +void zoo_set_log_stream(FILE*) {} + +int deallocate_String_vector(struct String_vector *v) { + for (int i=0; i< v->count; ++i) { + delete[] v->data[i]; + } + delete[] v->data; + return 0; +} + + +ZOOAPI int zoo_get(zhandle_t *zh, const char *path, int watch, char *buffer, + int* buffer_len, struct Stat *stat) +{ + (void)watch; + + return zoo_wget(zh, path, + 0, 0, + buffer, buffer_len, stat); + +} + +ZOOAPI int zoo_wget(zhandle_t *zh, const char *path, + watcher_fn watcher, void* watcherCtx, + char *buffer, int* buffer_len, struct Stat *) +{ + Node& node = ((ZHandle*)zh)->getNode(path); + std::copy(node.buffer.begin(), node.buffer.end(), buffer); + *buffer_len = node.buffer.size(); + + node.addWatcher(watcher, watcherCtx); + return 0; +} + +ZOOAPI int zoo_wexists(zhandle_t *zh, const char *path, + watcher_fn watcher, void* watcherCtx, struct Stat *) +{ + Node& node = ((ZHandle*)zh)->getNode(path); + + node.addWatcher(watcher, watcherCtx); + return node.exists ? ZOK : ZNONODE; +} + +ZOOAPI int zoo_exists(zhandle_t *zh, const char *path, int watch, struct Stat *stat) +{ + (void)watch; + return zoo_wexists(zh, path, + 0, 0, + stat); +} + + + + +ZOOAPI ACL_vector ZOO_OPEN_ACL_UNSAFE; + +ZOOAPI const int ZOO_SEQUENCE = 1; +ZOOAPI const int ZOO_EPHEMERAL = 2; +ZOOAPI const int ZOO_SESSION_EVENT = 3; +ZOOAPI const int ZOO_EXPIRED_SESSION_STATE = 4; +ZOOAPI const int ZOO_AUTH_FAILED_STATE = 5; +} diff --git a/filedistribution/src/tests/rpc/.gitignore b/filedistribution/src/tests/rpc/.gitignore new file mode 100644 index 00000000000..b29a47efd87 --- /dev/null +++ b/filedistribution/src/tests/rpc/.gitignore @@ -0,0 +1 @@ +filedistribution_rpc_test_app diff --git a/filedistribution/src/tests/rpc/CMakeLists.txt b/filedistribution/src/tests/rpc/CMakeLists.txt new file mode 100644 index 00000000000..9a592c15ab4 --- /dev/null +++ b/filedistribution/src/tests/rpc/CMakeLists.txt @@ -0,0 +1,14 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(filedistribution_rpc_test_app + SOURCES + testfileprovider.cpp + DEPENDS + filedistribution_filedistributorrpc +) +target_compile_options(filedistribution_rpc_test_app PRIVATE -DTORRENT_DEBUG -DTORRENT_DISABLE_ENCRYPTION -DTORRENT_DISABLE_DHT -DWITH_SHIPPED_GEOIP_H -DBOOST_ASIO_HASH_MAP_BUCKETS=1021 -DBOOST_EXCEPTION_DISABLE -DBOOST_ASIO_ENABLE_CANCELIO -DBOOST_ASIO_DYN_LINK -DTORRENT_LINKING_SHARED) +vespa_add_target_system_dependency(filedistribution_rpc_test_app boost boost_system-mt-d) +vespa_add_target_system_dependency(filedistribution_rpc_test_app boost boost_thread-mt-d) +vespa_add_target_system_dependency(filedistribution_rpc_test_app boost boost_program_options-mt-d) +vespa_add_target_system_dependency(filedistribution_rpc_test_app boost boost_filesystem-mt-d) +vespa_add_target_system_dependency(filedistribution_rpc_test_app boost boost_unit_test_framework-mt-d) +vespa_add_test(NAME filedistribution_rpc_test_app NO_VALGRIND COMMAND filedistribution_rpc_test_app) diff --git a/filedistribution/src/tests/rpc/mockfileprovider.h b/filedistribution/src/tests/rpc/mockfileprovider.h new file mode 100644 index 00000000000..745acc7196c --- /dev/null +++ b/filedistribution/src/tests/rpc/mockfileprovider.h @@ -0,0 +1,50 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/filedistribution/rpc/fileprovider.h> +#include <boost/thread/barrier.hpp> + +namespace filedistribution { + +class MockFileProvider : public FileProvider { + DownloadCompletedSignal _downloadCompleted; + DownloadFailedSignal _downloadFailed; +public: + static const std::string _queueForeverFileReference; + + boost::barrier _queueForeverBarrier; + + boost::optional<boost::filesystem::path> getPath(const std::string& fileReference) { + if (fileReference == "dd") { + return boost::filesystem::path("direct/result/path"); + } else { + return boost::optional<boost::filesystem::path>(); + } + } + + void downloadFile(const std::string& fileReference) { + if (fileReference == _queueForeverFileReference) { + _queueForeverBarrier.wait(); + return; + } + + sleep(1); + downloadCompleted()(fileReference, "downloaded/path/" + fileReference); + } + + //Overrides + DownloadCompletedSignal& downloadCompleted() { + return _downloadCompleted; + } + + DownloadFailedSignal& downloadFailed() { + return _downloadFailed; + } + + MockFileProvider() + :_queueForeverBarrier(2) + {} +}; + +} //namespace filedistribution + diff --git a/filedistribution/src/tests/rpc/testfileprovider.cpp b/filedistribution/src/tests/rpc/testfileprovider.cpp new file mode 100644 index 00000000000..6881eb96b8a --- /dev/null +++ b/filedistribution/src/tests/rpc/testfileprovider.cpp @@ -0,0 +1,66 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#define BOOST_TEST_DYN_LINK +#define BOOST_TEST_MAIN + +#include <vespa/fastos/fastos.h> +#include "mockfileprovider.h" +#include <vespa/filedistribution/rpc/filedistributorrpc.h> +#include <vespa/frtstream/frtclientstream.h> +#include <iostream> +#include <boost/test/unit_test.hpp> + +namespace fd = filedistribution; + +using fd::MockFileProvider; + +const std::string MockFileProvider::_queueForeverFileReference("queue-forever"); + +BOOST_AUTO_TEST_CASE(fileDistributionRPCTest) { + const std::string spec("tcp/localhost:9111"); + boost::shared_ptr<fd::MockFileProvider> provider(new fd::MockFileProvider()); + boost::shared_ptr<fd::FileDistributorRPC> fileDistributorRPC(new fd::FileDistributorRPC(spec, provider)); + fileDistributorRPC->start(); + + frtstream::FrtClientStream rpc(spec); + frtstream::Method method("waitFor"); + + std::string path; + rpc <<method <<"dd"; + rpc >> path; + BOOST_CHECK_EQUAL("direct/result/path", path); + + rpc <<method <<"0123456789abcdef"; + rpc >> path; + BOOST_CHECK_EQUAL("downloaded/path/0123456789abcdef", path); +} + +//must be run through valgrind +BOOST_AUTO_TEST_CASE(require_that_queued_requests_does_not_leak_memory) { + const std::string spec("tcp/localhost:9111"); + boost::shared_ptr<MockFileProvider> provider(new MockFileProvider()); + boost::shared_ptr<fd::FileDistributorRPC> fileDistributorRPC(new fd::FileDistributorRPC(spec, provider)); + fileDistributorRPC->start(); + + FRT_Supervisor supervisor; + + supervisor.Start(); + FRT_Target *target = supervisor.GetTarget(spec.c_str()); + + FRT_RPCRequest* request = supervisor.AllocRPCRequest(); + request->SetMethodName("waitFor"); + request->GetParams()->AddString(MockFileProvider::_queueForeverFileReference.c_str()); + target->InvokeVoid(request); + + provider->_queueForeverBarrier.wait(); //the request has been enqueued. + fileDistributorRPC.reset(); + + target->SubRef(); + supervisor.ShutDown(true); + +} + +BOOST_AUTO_TEST_CASE(require_that_port_can_be_extracted_from_connection_spec) { + BOOST_CHECK_EQUAL(9056, fd::FileDistributorRPC::get_port("tcp/host:9056")); + BOOST_CHECK_EQUAL(9056, fd::FileDistributorRPC::get_port("tcp/9056")); + BOOST_CHECK_EQUAL(9056, fd::FileDistributorRPC::get_port("9056")); +} diff --git a/filedistribution/src/tests/scheduler/.gitignore b/filedistribution/src/tests/scheduler/.gitignore new file mode 100644 index 00000000000..b1976d1c516 --- /dev/null +++ b/filedistribution/src/tests/scheduler/.gitignore @@ -0,0 +1 @@ +filedistribution_scheduler_test_app diff --git a/filedistribution/src/tests/scheduler/CMakeLists.txt b/filedistribution/src/tests/scheduler/CMakeLists.txt new file mode 100644 index 00000000000..67b86c3e634 --- /dev/null +++ b/filedistribution/src/tests/scheduler/CMakeLists.txt @@ -0,0 +1,16 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(filedistribution_scheduler_test_app + SOURCES + test-scheduler.cpp + DEPENDS + filedistribution_distributor + filedistribution_filedistributionmodel + filedistribution_common +) +target_compile_options(filedistribution_scheduler_test_app PRIVATE -DTORRENT_DEBUG -DTORRENT_DISABLE_ENCRYPTION -DTORRENT_DISABLE_DHT -DWITH_SHIPPED_GEOIP_H -DBOOST_ASIO_HASH_MAP_BUCKETS=1021 -DBOOST_EXCEPTION_DISABLE -DBOOST_ASIO_ENABLE_CANCELIO -DBOOST_ASIO_DYN_LINK -DTORRENT_LINKING_SHARED) +vespa_add_target_system_dependency(filedistribution_scheduler_test_app boost boost_system-mt-d) +vespa_add_target_system_dependency(filedistribution_scheduler_test_app boost boost_thread-mt-d) +vespa_add_target_system_dependency(filedistribution_scheduler_test_app boost boost_program_options-mt-d) +vespa_add_target_system_dependency(filedistribution_scheduler_test_app boost boost_filesystem-mt-d) +vespa_add_target_system_dependency(filedistribution_scheduler_test_app boost boost_unit_test_framework-mt-d) +vespa_add_test(NAME filedistribution_scheduler_test_app NO_VALGRIND COMMAND filedistribution_scheduler_test_app) diff --git a/filedistribution/src/tests/scheduler/test-scheduler.cpp b/filedistribution/src/tests/scheduler/test-scheduler.cpp new file mode 100644 index 00000000000..cc669690a31 --- /dev/null +++ b/filedistribution/src/tests/scheduler/test-scheduler.cpp @@ -0,0 +1,110 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#define BOOST_TEST_DYN_LINK +#define BOOST_TEST_MAIN +#include <vespa/fastos/fastos.h> +#include <boost/test/unit_test.hpp> + +#include <vespa/filedistribution/distributor/scheduler.h> + +#include <iostream> + +#include <boost/thread/barrier.hpp> + +using filedistribution::Scheduler; + +namespace asio = boost::asio; + +class TestException {}; + + +struct CallRun { + volatile bool _caughtException; + + CallRun() + :_caughtException(false) + {} + + void operator()(asio::io_service& ioService) { + while (!boost::this_thread::interruption_requested()) { + try { + //No reset needed after handling exceptions. + ioService.run(); + } catch(const TestException& e ) { + _caughtException = true; + } + } + } +}; + +struct Fixture { + CallRun callRun; + Scheduler scheduler; + + Fixture() + : scheduler(boost::ref(callRun)) + {} +}; + + +BOOST_FIXTURE_TEST_SUITE(SchedulerTest, Fixture) + + +struct RepeatedTask : Scheduler::Task { + void doHandle() { + std::cout <<"RepeatedTask::doHandle " <<std::endl; + schedule(boost::posix_time::seconds(1)); + } + + RepeatedTask(Scheduler& scheduler) : Task(scheduler) {} +}; + +BOOST_AUTO_TEST_CASE(require_tasks_does_not_keep_scheduler_alive) { + RepeatedTask::SP task(new RepeatedTask(scheduler)); + task->schedule(boost::posix_time::hours(10)); +} + +struct EnsureInvokedTask : Scheduler::Task { + boost::barrier& _barrier; + + void doHandle() { + _barrier.wait(); + } + + EnsureInvokedTask(Scheduler& scheduler, boost::barrier& barrier) : + Task(scheduler), + _barrier(barrier) + {} +}; + + +BOOST_AUTO_TEST_CASE(require_task_invoked) { + boost::barrier barrier(2); + + EnsureInvokedTask::SP task(new EnsureInvokedTask(scheduler, barrier)); + task->schedule(boost::posix_time::milliseconds(50)); + + barrier.wait(); +} + +struct ThrowExceptionTask : Scheduler::Task { + void doHandle() { + throw TestException(); + } + + ThrowExceptionTask(Scheduler& scheduler) : + Task(scheduler) + {} +}; + +BOOST_AUTO_TEST_CASE(require_exception_from_tasks_can_be_caught) { + ThrowExceptionTask::SP task(new ThrowExceptionTask(scheduler)); + task->scheduleNow(); + + for (int i=0; i<200 && !callRun._caughtException; ++i) { + boost::thread::sleep(boost::get_system_time() + boost::posix_time::milliseconds(100)); + } + + BOOST_CHECK(callRun._caughtException); +} + +BOOST_AUTO_TEST_SUITE_END() diff --git a/filedistribution/src/tests/status/.gitignore b/filedistribution/src/tests/status/.gitignore new file mode 100644 index 00000000000..3da528fcd45 --- /dev/null +++ b/filedistribution/src/tests/status/.gitignore @@ -0,0 +1 @@ +filedistribution_status_test_app diff --git a/filedistribution/src/tests/status/CMakeLists.txt b/filedistribution/src/tests/status/CMakeLists.txt new file mode 100644 index 00000000000..79676c4e811 --- /dev/null +++ b/filedistribution/src/tests/status/CMakeLists.txt @@ -0,0 +1,15 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(filedistribution_status_test_app + SOURCES + test-status.cpp + DEPENDS + filedistribution_filedistributionmodel + filedistribution_common +) +target_compile_options(filedistribution_status_test_app PRIVATE -DTORRENT_DEBUG -DTORRENT_DISABLE_ENCRYPTION -DTORRENT_DISABLE_DHT -DWITH_SHIPPED_GEOIP_H -DBOOST_ASIO_HASH_MAP_BUCKETS=1021 -DBOOST_EXCEPTION_DISABLE -DBOOST_ASIO_ENABLE_CANCELIO -DBOOST_ASIO_DYN_LINK -DTORRENT_LINKING_SHARED) +vespa_add_target_system_dependency(filedistribution_status_test_app boost boost_system-mt-d) +vespa_add_target_system_dependency(filedistribution_status_test_app boost boost_thread-mt-d) +vespa_add_target_system_dependency(filedistribution_status_test_app boost boost_program_options-mt-d) +vespa_add_target_system_dependency(filedistribution_status_test_app boost boost_filesystem-mt-d) +vespa_add_target_system_dependency(filedistribution_status_test_app boost boost_unit_test_framework-mt-d) +vespa_add_test(NAME filedistribution_status_test_app NO_VALGRIND COMMAND filedistribution_status_test_app) diff --git a/filedistribution/src/tests/status/test-status.cpp b/filedistribution/src/tests/status/test-status.cpp new file mode 100644 index 00000000000..7021752f316 --- /dev/null +++ b/filedistribution/src/tests/status/test-status.cpp @@ -0,0 +1,19 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#define BOOST_TEST_DYN_LINK +#define BOOST_TEST_MAIN +#include <vespa/fastos/fastos.h> +#include <boost/test/unit_test.hpp> +#include <boost/foreach.hpp> + +#include <vespa/filedistribution/common/exceptionrethrower.h> +#include <vespa/filedistribution/model/zkfacade.h> +#include <vespa/filedistribution/model/filedistributionmodel.h> +#include <vespa/filedistribution/model/filedistributionmodelimpl.h> + +using namespace filedistribution; + + +BOOST_AUTO_TEST_CASE(test_retrieve_status) { + // TODO: +} + diff --git a/filedistribution/src/tests/zkfacade/.gitignore b/filedistribution/src/tests/zkfacade/.gitignore new file mode 100644 index 00000000000..6ffef2339b1 --- /dev/null +++ b/filedistribution/src/tests/zkfacade/.gitignore @@ -0,0 +1 @@ +filedistribution_zkfacade_test_app diff --git a/filedistribution/src/tests/zkfacade/CMakeLists.txt b/filedistribution/src/tests/zkfacade/CMakeLists.txt new file mode 100644 index 00000000000..2c8e05aca89 --- /dev/null +++ b/filedistribution/src/tests/zkfacade/CMakeLists.txt @@ -0,0 +1,15 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(filedistribution_zkfacade_test_app + SOURCES + test-zkfacade.cpp + DEPENDS + filedistribution_filedistributionmodel + filedistribution_common + filedistribution_mocks +) +target_compile_options(filedistribution_zkfacade_test_app PRIVATE -DTORRENT_DEBUG -DTORRENT_DISABLE_ENCRYPTION -DTORRENT_DISABLE_DHT -DWITH_SHIPPED_GEOIP_H -DBOOST_ASIO_HASH_MAP_BUCKETS=1021 -DBOOST_EXCEPTION_DISABLE -DBOOST_ASIO_ENABLE_CANCELIO -DBOOST_ASIO_DYN_LINK -DTORRENT_LINKING_SHARED) +vespa_add_target_system_dependency(filedistribution_zkfacade_test_app boost boost_system-mt-d) +vespa_add_target_system_dependency(filedistribution_zkfacade_test_app boost boost_thread-mt-d) +vespa_add_target_system_dependency(filedistribution_zkfacade_test_app boost boost_program_options-mt-d) +vespa_add_target_system_dependency(filedistribution_zkfacade_test_app boost boost_filesystem-mt-d) +vespa_add_target_system_dependency(filedistribution_zkfacade_test_app boost boost_unit_test_framework-mt-d) diff --git a/filedistribution/src/tests/zkfacade/test-zkfacade.cpp b/filedistribution/src/tests/zkfacade/test-zkfacade.cpp new file mode 100644 index 00000000000..d45e5059a53 --- /dev/null +++ b/filedistribution/src/tests/zkfacade/test-zkfacade.cpp @@ -0,0 +1,226 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#define BOOST_TEST_DYN_LINK +#define BOOST_TEST_MAIN +#define BOOST_TEST_MODULE zkfacade test +#include <vespa/fastos/fastos.h> +#include <boost/test/unit_test.hpp> + +#include <iostream> + +#include <boost/thread/barrier.hpp> +#include <boost/thread/thread.hpp> +#include <boost/checked_delete.hpp> + +#include <vespa/filedistribution/common/componentsdeleter.h> +#include <vespa/filedistribution/model/zkfacade.h> + +#include <zookeeper/zookeeper.h> + + + +using namespace filedistribution; + +namespace { + + +struct Watcher : public ZKFacade::NodeChangedWatcher { + boost::barrier _barrier; + + Watcher() : + _barrier(2) {} + + void operator()() { + _barrier.wait(); + } +}; + +struct Fixture { + boost::shared_ptr<ExceptionRethrower> _exceptionRethrower; + ComponentsDeleter _componentsDeleter; + boost::shared_ptr<ZKFacade> zk; + ZKFacade::Path testNode; + + Fixture() { + _exceptionRethrower.reset(new ExceptionRethrower()); + + zoo_set_debug_level(ZOO_LOG_LEVEL_WARN); + zk = _componentsDeleter.track(new ZKFacade("test1-tonyv:2181", _exceptionRethrower)); + + testNode = "/test-node"; + zk->removeIfExists(testNode); + } + + ~Fixture() { + if (zk) { + zk->removeIfExists(testNode); + } + } +}; + +} //anonymous namespace + + +BOOST_FIXTURE_TEST_SUITE(ZKFacadeTests, Fixture) + +BOOST_AUTO_TEST_CASE(hasNode) +{ + zk->setData(testNode, "", 0); + BOOST_CHECK(zk->hasNode(testNode)); + + zk->remove(testNode); + BOOST_CHECK(!zk->hasNode(testNode)); +} + + +BOOST_AUTO_TEST_CASE(hasNodeNotification) +{ + boost::shared_ptr<Watcher> watcher(new Watcher); + + zk->hasNode(testNode, watcher); + zk->setData(testNode, "", 0); + watcher->_barrier.wait(); + + //after the notification has returned, the watcher must no longer reside in watchers map. + for (int i=0; i<20 && !watcher.unique(); ++i) { + boost::thread::sleep(boost::get_system_time() + boost::posix_time::milliseconds(100)); + } + BOOST_CHECK(watcher.unique()); +} + +BOOST_AUTO_TEST_CASE(getAndSetData) +{ + std::string inputString = "test data."; + Buffer inputBuffer(inputString.begin(), inputString.end()); + + zk->setData(testNode, inputBuffer); + + Buffer outputBuffer(zk->getData(testNode)); + std::string outputString(outputBuffer.begin(), outputBuffer.end()); + + BOOST_CHECK(outputString == inputString); + + outputString = zk->getString(testNode); + BOOST_CHECK(outputString == inputString); +} + +BOOST_AUTO_TEST_CASE(setDataMustExist) +{ + bool mustExist = true; + BOOST_REQUIRE_THROW(zk->setData(testNode, "", 0, mustExist), ZKNodeDoesNotExistsException); +} + +BOOST_AUTO_TEST_CASE(createSequenceNode) +{ + zk->setData(testNode, "", 0); + + ZKFacade::Path prefix = testNode / "prefix"; + zk->createSequenceNode(prefix, "test", 4); + zk->createSequenceNode(prefix, "test", 4); + zk->createSequenceNode(prefix, "test", 4); + + std::vector<std::string> children = zk->getChildren(testNode); + BOOST_CHECK(children.size() == 3); + BOOST_CHECK(children.begin()->substr(0,6) == "prefix"); + + Buffer buffer(zk->getData(testNode / *children.begin())); + std::string bufferContent(buffer.begin(), buffer.end()); + + BOOST_CHECK(bufferContent == "test"); +} + +BOOST_AUTO_TEST_CASE(retainOnly) +{ + zk->setData(testNode, "", 0); + + zk->setData(testNode / "a", "", 0); + zk->setData(testNode / "b", "", 0); + zk->setData(testNode / "c", "", 0); + zk->setData(testNode / "d", "", 0); + + std::vector<std::string> toRetain; + toRetain.push_back("a"); + toRetain.push_back("c"); + + zk->retainOnly(testNode, toRetain); + std::vector<std::string> children = zk->getChildren(testNode); + + std::sort(children.begin(), children.end()); + BOOST_CHECK(children == toRetain); +} + + + +BOOST_AUTO_TEST_CASE(addEphemeralNode) +{ + ZKFacade::Path ephemeralNode = "/test-ephemeral-node"; + zk->removeIfExists(ephemeralNode); + + //Checked deleter is ok here since we're not installing any watchers + ZKFacade::SP zk2(new ZKFacade("test1-tonyv:2181", _exceptionRethrower), + boost::checked_deleter<ZKFacade>()); + zk2->addEphemeralNode(ephemeralNode); + + BOOST_CHECK(zk->hasNode(ephemeralNode)); + zk2.reset(); + BOOST_CHECK(!zk->hasNode(ephemeralNode)); +} + + + +BOOST_AUTO_TEST_CASE(dataChangedNotification) +{ + boost::shared_ptr<Watcher> watcher(new Watcher); + + zk->setData(testNode, "", 0); + Buffer buffer(zk->getData(testNode, watcher)); + BOOST_CHECK(buffer.size() == 0); + + bool mustExist = true; + zk->setData(testNode, "test", 4, mustExist); + watcher->_barrier.wait(); +} + +BOOST_AUTO_TEST_CASE(getChildrenNotification) +{ + boost::shared_ptr<Watcher> watcher(new Watcher); + + zk->setData(testNode, "", 0); + zk->getChildren(testNode, watcher); + + zk->setData(testNode / "child", "", 0); + watcher->_barrier.wait(); +} + +BOOST_AUTO_TEST_CASE(require_that_zkfacade_can_be_deleted_from_callback) +{ + struct DeleteZKFacadeWatcher : public Watcher { + boost::shared_ptr<ZKFacade> _zk; + + DeleteZKFacadeWatcher(const boost::shared_ptr<ZKFacade>& zk) + :_zk(zk) + {} + + void operator()() { + BOOST_CHECK(_zk.use_count() == 2); + _zk.reset(); + Watcher::operator()(); + } + }; + + boost::shared_ptr<Watcher> watcher((Watcher*)new DeleteZKFacadeWatcher(zk)); + + zk->setData(testNode, "", 0); + zk->getData(testNode, watcher); + + ZKFacade* unprotectedZk = zk.get(); + zk.reset(); + + unprotectedZk->setData(testNode, "t", 1); + watcher->_barrier.wait(); + + //Must wait longer than the zookeeper_close timeout to catch + //problems due to closing zookeeper in a zookeeper watcher thread. + sleep(3); +} + +BOOST_AUTO_TEST_SUITE_END() diff --git a/filedistribution/src/tests/zkfiledbmodel/.gitignore b/filedistribution/src/tests/zkfiledbmodel/.gitignore new file mode 100644 index 00000000000..18c14c47c56 --- /dev/null +++ b/filedistribution/src/tests/zkfiledbmodel/.gitignore @@ -0,0 +1 @@ +filedistribution_zkfiledbmodel_test_app diff --git a/filedistribution/src/tests/zkfiledbmodel/CMakeLists.txt b/filedistribution/src/tests/zkfiledbmodel/CMakeLists.txt new file mode 100644 index 00000000000..7b205d009f8 --- /dev/null +++ b/filedistribution/src/tests/zkfiledbmodel/CMakeLists.txt @@ -0,0 +1,16 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(filedistribution_zkfiledbmodel_test_app + SOURCES + test-zkfiledbmodel.cpp + DEPENDS + filedistribution_filedistributionmodel + filedistribution_common + filedistribution_mocks +) +target_compile_options(filedistribution_zkfiledbmodel_test_app PRIVATE -DTORRENT_DEBUG -DTORRENT_DISABLE_ENCRYPTION -DTORRENT_DISABLE_DHT -DWITH_SHIPPED_GEOIP_H -DBOOST_ASIO_HASH_MAP_BUCKETS=1021 -DBOOST_EXCEPTION_DISABLE -DBOOST_ASIO_ENABLE_CANCELIO -DBOOST_ASIO_DYN_LINK -DTORRENT_LINKING_SHARED) +vespa_add_target_system_dependency(filedistribution_zkfiledbmodel_test_app boost boost_system-mt-d) +vespa_add_target_system_dependency(filedistribution_zkfiledbmodel_test_app boost boost_thread-mt-d) +vespa_add_target_system_dependency(filedistribution_zkfiledbmodel_test_app boost boost_program_options-mt-d) +vespa_add_target_system_dependency(filedistribution_zkfiledbmodel_test_app boost boost_filesystem-mt-d) +vespa_add_target_system_dependency(filedistribution_zkfiledbmodel_test_app boost boost_unit_test_framework-mt-d) +vespa_add_test(NAME filedistribution_zkfiledbmodel_test_app NO_VALGRIND COMMAND filedistribution_zkfiledbmodel_test_app) diff --git a/filedistribution/src/tests/zkfiledbmodel/test-zkfiledbmodel.cpp b/filedistribution/src/tests/zkfiledbmodel/test-zkfiledbmodel.cpp new file mode 100644 index 00000000000..b385949bb98 --- /dev/null +++ b/filedistribution/src/tests/zkfiledbmodel/test-zkfiledbmodel.cpp @@ -0,0 +1,99 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#define BOOST_TEST_DYN_LINK +#define BOOST_TEST_MAIN +#define BOOST_TEST_MODULE zkfiledbmodel test +#include <vespa/fastos/fastos.h> +#include <boost/test/unit_test.hpp> + +#include <iostream> + +#include <boost/thread/barrier.hpp> +#include <boost/thread/thread.hpp> +#include <boost/checked_delete.hpp> + +#include <vespa/filedistribution/common/componentsdeleter.h> +#include <vespa/filedistribution/model/zkfacade.h> +#include <vespa/filedistribution/model/zkfiledbmodel.h> + +#include <zookeeper/zookeeper.h> + + +using namespace filedistribution; + +typedef boost::filesystem::path Path; + +namespace { + + +struct Fixture { + boost::shared_ptr<ExceptionRethrower> _exceptionRethrower; + ComponentsDeleter _componentsDeleter; + boost::shared_ptr<ZKFacade> zk; + boost::shared_ptr<ZKFileDBModel> model; + + Fixture() { + _exceptionRethrower.reset(new ExceptionRethrower()); + + zoo_set_debug_level(ZOO_LOG_LEVEL_WARN); + zk = _componentsDeleter.track(new ZKFacade("test1-tonyv:2181", _exceptionRethrower)); + zk->setData("/vespa", "", 0); + + model = _componentsDeleter.track(new ZKFileDBModel(zk)); + } +}; + +} //anonymous namespace + + +BOOST_FIXTURE_TEST_SUITE(ZKFileDBModelTests, Fixture) + +BOOST_AUTO_TEST_CASE(retainOnlyHostsForTenant) +{ + Path path = "/vespa/filedistribution/hosts"; + std::vector<std::string> files = {"myfile"}; + BOOST_CHECK(zk->hasNode("/vespa")); + BOOST_CHECK(zk->hasNode("/vespa/filedistribution")); + BOOST_CHECK(zk->hasNode(path)); + model->setDeployedFilesToDownload("testhost", "myapp:so:cool", files); + model->setDeployedFilesToDownload("testhost2", "myapp:so:cool", files); + model->setDeployedFilesToDownload("testhost3", "myapp:so:cool", files); + model->setDeployedFilesToDownload("testhost3", "myapp:legacyid:so:cool", files); + model->setDeployedFilesToDownload("testhost3", "yourapp:so:cool", files); + BOOST_CHECK(zk->getChildren(path / "testhost").size() == 1); + BOOST_CHECK(zk->getChildren(path / "testhost2").size() == 1); + BOOST_CHECK(zk->getChildren(path / "testhost3").size() == 3); + + model->cleanDeployedFilesToDownload({"testhost3"}, "yourapp:so:cool"); + model->removeDeploymentsThatHaveDifferentApplicationId({"testhost3"}, "yourapp:so:cool"); + BOOST_CHECK(zk->hasNode(path / "testhost")); + BOOST_CHECK(zk->hasNode(path / "testhost2")); + BOOST_CHECK(zk->hasNode(path / "testhost3")); + BOOST_CHECK(zk->getChildren(path / "testhost").size() == 1); + BOOST_CHECK(zk->getChildren(path / "testhost2").size() == 1); + BOOST_CHECK(zk->getChildren(path / "testhost3").size() == 1); + + model->cleanDeployedFilesToDownload({"testhost"}, "myapp:not:cool"); + model->removeDeploymentsThatHaveDifferentApplicationId({"testhost"}, "myapp:not:cool"); + BOOST_CHECK(zk->hasNode(path / "testhost")); + BOOST_CHECK(zk->hasNode(path / "testhost2")); + BOOST_CHECK(zk->hasNode(path / "testhost3")); + BOOST_CHECK(zk->getChildren(path / "testhost").size() == 0); + BOOST_CHECK(zk->getChildren(path / "testhost2").size() == 1); + BOOST_CHECK(zk->getChildren(path / "testhost3").size() == 1); + + model->cleanDeployedFilesToDownload({"testhost2"}, "myapp:so:cool"); + model->removeDeploymentsThatHaveDifferentApplicationId({"testhost2"}, "myapp:so:cool"); + + BOOST_CHECK(!zk->hasNode(path / "testhost")); + BOOST_CHECK(zk->hasNode(path / "testhost2")); + BOOST_CHECK(zk->hasNode(path / "testhost3")); + BOOST_CHECK(zk->getChildren(path / "testhost2").size() == 1); + BOOST_CHECK(zk->getChildren(path / "testhost3").size() == 1); + + model->cleanDeployedFilesToDownload({"testhost2"}, "yourapp:so:cool"); + BOOST_CHECK(!zk->hasNode(path / "testhost")); + BOOST_CHECK(zk->hasNode(path / "testhost2")); + BOOST_CHECK(!zk->hasNode(path / "testhost3")); +} + +BOOST_AUTO_TEST_SUITE_END() diff --git a/filedistribution/src/vespa/.gitignore b/filedistribution/src/vespa/.gitignore new file mode 100644 index 00000000000..a477b4a6e5f --- /dev/null +++ b/filedistribution/src/vespa/.gitignore @@ -0,0 +1,3 @@ +Makefile +config-*.h +config-*.cpp diff --git a/filedistribution/src/vespa/filedistribution/common/CMakeLists.txt b/filedistribution/src/vespa/filedistribution/common/CMakeLists.txt new file mode 100644 index 00000000000..ce692e6c7cc --- /dev/null +++ b/filedistribution/src/vespa/filedistribution/common/CMakeLists.txt @@ -0,0 +1,19 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_library(filedistribution_exceptionrethrower OBJECT + SOURCES + exception.cpp + DEPENDS +) +vespa_add_library(filedistribution_common STATIC + SOURCES + componentsdeleter.cpp + exception.cpp + vespa_logfwd.cpp + DEPENDS +) +target_compile_options(filedistribution_common PRIVATE -DTORRENT_DEBUG -DTORRENT_DISABLE_ENCRYPTION -DTORRENT_DISABLE_DHT -DWITH_SHIPPED_GEOIP_H -DBOOST_ASIO_HASH_MAP_BUCKETS=1021 -DBOOST_EXCEPTION_DISABLE -DBOOST_ASIO_ENABLE_CANCELIO -DBOOST_ASIO_DYN_LINK -DTORRENT_LINKING_SHARED) +vespa_add_target_system_dependency(filedistribution_common boost boost_system-mt-d) +vespa_add_target_system_dependency(filedistribution_common boost boost_thread-mt-d) +vespa_add_target_system_dependency(filedistribution_common boost boost_program_options-mt-d) +vespa_add_target_system_dependency(filedistribution_common boost boost_filesystem-mt-d) +vespa_add_target_system_dependency(filedistribution_common boost boost_unit_test_framework-mt-d) diff --git a/filedistribution/src/vespa/filedistribution/common/buffer.h b/filedistribution/src/vespa/filedistribution/common/buffer.h new file mode 100644 index 00000000000..0c0692798ac --- /dev/null +++ b/filedistribution/src/vespa/filedistribution/common/buffer.h @@ -0,0 +1,151 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <boost/noncopyable.hpp> +#include <algorithm> + +namespace filedistribution { + +struct USED_FOR_MOVING {}; + +template <class T> +class Move { + mutable T _holder; +public: + Move(T& toMove) + :_holder(USED_FOR_MOVING()) + { + _holder.swap(toMove); + } + + Move(const Move& other) + :_holder(USED_FOR_MOVING()) + { + _holder.swap(other._holder); + } + + void swap(T& t) const { + _holder.swap(t); + } + +private: + Move& operator=(const Move&); +}; + +template <class T> +inline Move<T> move(T& t) { + return Move<T>(t); +} + +class Buffer : boost::noncopyable { + size_t _capacity; + char* _buf; + size_t _size; +public: + typedef char value_type; + typedef const value_type& const_reference; + typedef char* iterator; + typedef const char* const_iterator; + + explicit Buffer(size_t capacityArg) + :_capacity(capacityArg), + _buf( new char[_capacity] ), + _size(0) + {} + + Buffer(const Move<Buffer>& buffer); + + explicit Buffer(USED_FOR_MOVING) + :_capacity(0), + _buf(0), + _size(0) + {} + + template <typename ITER> + Buffer(ITER beginIter, ITER endIter) + : _capacity(endIter-beginIter), + _buf( new char[_capacity] ), + _size(_capacity) + { + std::copy(beginIter, endIter, begin()); + } + + ~Buffer() { + delete[] _buf; + } + + size_t capacity() const { + return _capacity; + } + + size_t size() const { + return _size; + } + + //might expose uninitialized memory + void resize(size_t newSize) { + if ( newSize <= _capacity ) + _size = newSize; + else { + reserve(newSize); + _size = newSize; + } + } + + void reserve(size_t newCapacity) { + if ( newCapacity > _capacity ) { + Buffer buffer(newCapacity); + buffer._size = _size; + std::copy(begin(), end(), buffer.begin()); + buffer.swap(*this); + } + } + + void swap(Buffer& other) { + std::swap(_capacity, other._capacity); + std::swap(_buf, other._buf); + std::swap(_size, other._size); + } + + void push_back(char c) { + if (_size == _capacity) + reserve(_capacity * 2); + _buf[_size++] = c; + } + + iterator begin() { + return _buf; + } + + iterator end() { + return _buf + _size; + } + + const_iterator begin() const { + return _buf; + } + + const_iterator end() const { + return _buf + _size; + } + + char operator[](size_t i) const { + return _buf[i]; + } + + char& operator[](size_t i) { + return _buf[i]; + } +}; + +inline Buffer::Buffer(const Move<Buffer>& buffer) + :_capacity(0), + _buf(0), + _size(0) +{ + buffer.swap(*this); +} + +} //namespace filedistribution + + diff --git a/filedistribution/src/vespa/filedistribution/common/componentsdeleter.cpp b/filedistribution/src/vespa/filedistribution/common/componentsdeleter.cpp new file mode 100644 index 00000000000..74b36b77a72 --- /dev/null +++ b/filedistribution/src/vespa/filedistribution/common/componentsdeleter.cpp @@ -0,0 +1,89 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include "componentsdeleter.h" + +#include <vespa/log/log.h> +LOG_SETUP(".componentsdeleter"); + +#include <boost/foreach.hpp> + + +using namespace filedistribution; + +struct ComponentsDeleter::Worker { + ComponentsDeleter& _parent; + + Worker(ComponentsDeleter* parent) + :_parent(*parent) {} + + void operator()(); +}; + + +void +ComponentsDeleter::Worker::operator()() +{ + while (!boost::this_thread::interruption_requested()) { + try { + CallDeleteFun deleteFun = _parent._deleteRequests.pop(); + boost::this_thread::disable_interruption di; + deleteFun(); + } catch(const std::exception& e) { + LOG(error, e.what()); + } + } +} + +ComponentsDeleter::ComponentsDeleter() + :_deleterThread(Worker(this)) +{} + +ComponentsDeleter::~ComponentsDeleter() +{ + waitForAllComponentsDeleted(); + _deleterThread.interrupt(); + _deleterThread.join(); +} + +void +ComponentsDeleter::waitForAllComponentsDeleted() +{ + LOG(debug, "Waiting for all components to be deleted"); + + for (int i=0; i<600 && !allComponentsDeleted(); ++i) { + boost::this_thread::sleep(boost::posix_time::milliseconds(100)); + } + LOG(debug, "Done waiting for all components to be deleted"); + + logNotDeletedComponents(); + + if (!allComponentsDeleted()) + kill(getpid(), SIGKILL); +} + +bool +ComponentsDeleter::allComponentsDeleted() +{ + LockGuard guard(_trackedComponentsMutex); + return _trackedComponents.empty(); +} + +void +ComponentsDeleter::logNotDeletedComponents() +{ + LockGuard guard(_trackedComponentsMutex); + BOOST_FOREACH(TrackedComponentsMap::value_type component, _trackedComponents) { + LOG(info, "Timed out waiting for component '%s' to be deleted", component.second.c_str()); + } +} + +void +ComponentsDeleter::removeFromTrackedComponents(void* component) { + LockGuard guard(_trackedComponentsMutex); + if (_trackedComponents.count(component)) + LOG(debug, "Deleting '%s'", _trackedComponents[component].c_str()); + + size_t numErased = _trackedComponents.erase(component); + assert(numErased == 1); + (void) numErased; +} diff --git a/filedistribution/src/vespa/filedistribution/common/componentsdeleter.h b/filedistribution/src/vespa/filedistribution/common/componentsdeleter.h new file mode 100644 index 00000000000..afb122b3a2f --- /dev/null +++ b/filedistribution/src/vespa/filedistribution/common/componentsdeleter.h @@ -0,0 +1,71 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <map> +#include <typeinfo> +#include <string> + +#include <boost/function.hpp> +#include <boost/bind.hpp> +#include <boost/checked_delete.hpp> +#include <boost/thread/mutex.hpp> +#include <boost/noncopyable.hpp> +#include <boost/thread.hpp> + +#include "concurrentqueue.h" + +namespace filedistribution { +/** + * Ensures that components are deleted in a separate thread, + * and that their lifetime is tracked. + * + * This prevents situations as e.g. deleting ZKFacade from a zookeeper watcher thread. + */ +class ComponentsDeleter : boost::noncopyable { + class Worker; + typedef boost::lock_guard<boost::mutex> LockGuard; + + boost::mutex _trackedComponentsMutex; + typedef std::map<void*, std::string> TrackedComponentsMap; + TrackedComponentsMap _trackedComponents; + + typedef boost::function<void (void)> CallDeleteFun; + ConcurrentQueue<CallDeleteFun> _deleteRequests; + + boost::thread _deleterThread; + + void removeFromTrackedComponents(void* component); + + template <class T> + void deleteComponent(T* component) { + removeFromTrackedComponents(component); + delete component; + } + + template <class T> + void requestDelete(T* component) { + _deleteRequests.push(boost::bind(&ComponentsDeleter::deleteComponent<T>, this, component)); + } + + void waitForAllComponentsDeleted(); + bool allComponentsDeleted(); + void logNotDeletedComponents(); + public: + ComponentsDeleter(); + + /* + * Waits blocking for up to 60 seconds until all components are deleted. + * If it fails, the application is killed. + */ + ~ComponentsDeleter(); + + template <class T> + boost::shared_ptr<T> track(T* t) { + LockGuard guard(_trackedComponentsMutex); + + _trackedComponents[t] = typeid(t).name(); + return boost::shared_ptr<T>(t, boost::bind(&ComponentsDeleter::requestDelete<T>, this, t)); + } +}; +} + diff --git a/filedistribution/src/vespa/filedistribution/common/concurrentqueue.h b/filedistribution/src/vespa/filedistribution/common/concurrentqueue.h new file mode 100644 index 00000000000..056ba3153a2 --- /dev/null +++ b/filedistribution/src/vespa/filedistribution/common/concurrentqueue.h @@ -0,0 +1,53 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <queue> + +#include <boost/thread/condition_variable.hpp> +#include <boost/thread/mutex.hpp> +#include <boost/thread/locks.hpp> + +namespace filedistribution { + +template <typename T> +class ConcurrentQueue { +public: + typedef T value_type; +private: + boost::condition_variable _nonEmpty; + + mutable boost::mutex _queueMutex; + typedef boost::unique_lock<boost::mutex> UniqueLock; + + std::queue<value_type> _queue; + +public: + void push(const T& t) { + { + UniqueLock guard(_queueMutex); + _queue.push(t); + } + _nonEmpty.notify_one(); + } + + //Assumes that value_type has nonthrow copy constructors + const T pop() { + UniqueLock guard(_queueMutex); + while (_queue.empty()) { + _nonEmpty.wait(guard); + } + T result = _queue.front(); + _queue.pop(); + return result; + } + + void clear() { + UniqueLock guard(_queueMutex); + while (!_queue.empty()) { + _queue.pop(); + } + } +}; + +} //namespace filedistribution + diff --git a/filedistribution/src/vespa/filedistribution/common/exception.cpp b/filedistribution/src/vespa/filedistribution/common/exception.cpp new file mode 100644 index 00000000000..7195a99d702 --- /dev/null +++ b/filedistribution/src/vespa/filedistribution/common/exception.cpp @@ -0,0 +1,24 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include "exception.h" + +#include <execinfo.h> + +std::ostream& +filedistribution::operator<<(std::ostream& stream, const Backtrace& backtrace) { + char** strings = backtrace_symbols( + &*backtrace._frames.begin(), backtrace._size); + + stream <<"Backtrace:" <<std::endl; + for (size_t i = 0; i<backtrace._size; ++i) { + stream <<strings[i] <<std::endl; + } + + free(strings); + return stream; +} + + +filedistribution::Backtrace::Backtrace() + :_size(backtrace(&*_frames.begin(), _frames.size())) +{} diff --git a/filedistribution/src/vespa/filedistribution/common/exception.h b/filedistribution/src/vespa/filedistribution/common/exception.h new file mode 100644 index 00000000000..1d15253f883 --- /dev/null +++ b/filedistribution/src/vespa/filedistribution/common/exception.h @@ -0,0 +1,65 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <boost/exception/all.hpp> +#include <boost/current_function.hpp> +#include <boost/exception/all.hpp> +#include <boost/array.hpp> +#include <boost/version.hpp> + +namespace filedistribution { + +class Backtrace { + static const size_t _maxBacktraceSize = 200; + public: + boost::array<void*, _maxBacktraceSize> _frames; + const size_t _size; + + Backtrace(); +}; + + +std::ostream& operator<<(std::ostream& stream, const Backtrace& backtrace); + +namespace errorinfo { +typedef boost::error_info<struct tag_Backtrace, Backtrace> Backtrace; +typedef boost::error_info<struct tag_UserMessage, Backtrace> ExplanationForUser; +} + +//Exceptions should inherit virtually from boost and std exception, +//see http://www.boost.org/doc/libs/1_39_0/libs/exception/doc/using_virtual_inheritance_in_exception_types.html +struct Exception : virtual boost::exception, virtual std::exception { + Exception() { + *this << errorinfo::Backtrace(Backtrace()); + } +}; + +} //namespace filedistribution + +#if BOOST_VERSION < 103700 +#define BOOST_THROW_EXCEPTION(x)\ + ::boost::throw_exception( ::boost::enable_error_info(x) << \ + ::boost::throw_function(BOOST_CURRENT_FUNCTION) << \ + ::boost::throw_file(__FILE__) << \ + ::boost::throw_line((int)__LINE__) ) + +#endif + + +//********** Begin: Please remove when fixed upstream. +//boost 1.36 & 1.37 bugfix: allow attaching a boost::filesytem::path to a boost::exception +//using the error info mechanism. +#include <boost/filesystem/path.hpp> + +namespace boost{ +namespace to_string_detail { +std::basic_ostream<boost::filesystem::path::string_type::value_type, + boost::filesystem::path::string_type::traits_type > & +operator<< +( std::basic_ostream<boost::filesystem::path::string_type::value_type, + boost::filesystem::path::string_type::traits_type >& os, const boost::filesystem::path & ph ); +} +} + +//********** End: Please remove when fixed upstream. + diff --git a/filedistribution/src/vespa/filedistribution/common/exceptionrethrower.h b/filedistribution/src/vespa/filedistribution/common/exceptionrethrower.h new file mode 100644 index 00000000000..28b45546a64 --- /dev/null +++ b/filedistribution/src/vespa/filedistribution/common/exceptionrethrower.h @@ -0,0 +1,47 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <boost/thread/mutex.hpp> +#include <boost/thread/locks.hpp> +#include <boost/exception_ptr.hpp> +#include <boost/type_traits/is_polymorphic.hpp> + +namespace filedistribution { + +//used for rethrowing an exceptions in a different context +class ExceptionRethrower { + boost::exception_ptr _exceptionPtr; //not a pod, default constructed to null value + + mutable boost::mutex _exceptionMutex; + typedef boost::lock_guard<boost::mutex> LockGuard; + +public: + void rethrow() const { + LockGuard guard(_exceptionMutex); + + if (_exceptionPtr) + boost::rethrow_exception(_exceptionPtr); + } + + bool exceptionStored() const { + LockGuard guard(_exceptionMutex); + return _exceptionPtr; + } + + template <class T> + void store(const T& exception) { + boost::exception_ptr exceptionPtr = boost::copy_exception(exception); + store(exceptionPtr); + } + + void store(const boost::exception_ptr exceptionPtr) { + LockGuard guard(_exceptionMutex); + + if (!_exceptionPtr) //only store the first exception to be rethrowed. + _exceptionPtr = exceptionPtr; + } +}; + +} //namespace filedistribution + + diff --git a/filedistribution/src/vespa/filedistribution/common/logfwd.h b/filedistribution/src/vespa/filedistribution/common/logfwd.h new file mode 100644 index 00000000000..1a2d7a559bf --- /dev/null +++ b/filedistribution/src/vespa/filedistribution/common/logfwd.h @@ -0,0 +1,20 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +namespace filedistribution { + +/** To avoid requiring vespa log from the jni library*/ +namespace logfwd { + +enum LogLevel { debug, error, warning, info }; + +void log(LogLevel level, const char *file, int line, const char *fmt, ...) + __attribute__((format(printf,4,5))); + +} //namespace logfwd +} //namespace filedistribution + +#define LOGFWD(level, ...) filedistribution::logfwd::log(filedistribution::logfwd::level, \ + __FILE__, __LINE__, __VA_ARGS__) + + diff --git a/filedistribution/src/vespa/filedistribution/common/vespa_logfwd.cpp b/filedistribution/src/vespa/filedistribution/common/vespa_logfwd.cpp new file mode 100644 index 00000000000..ab2a2334434 --- /dev/null +++ b/filedistribution/src/vespa/filedistribution/common/vespa_logfwd.cpp @@ -0,0 +1,47 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <stdarg.h> +#include <boost/scoped_array.hpp> + +#include "logfwd.h" + +#include <vespa/log/log.h> + +LOG_SETUP(".common.model"); + + +using ns_log::Logger; + +namespace { + Logger::LogLevel toVespaLogLevel(filedistribution::logfwd::LogLevel level) { + namespace l = filedistribution::logfwd; + + switch (level) { + case l::info: return Logger::info; + case l::debug: return Logger::debug; + case l::error: return Logger::error; + case l::warning: return Logger::warning; + default: + LOG(error, "Unknown log level, falling back to error"); + return Logger::error; + } + } +} + +void filedistribution::logfwd::log(LogLevel level, const char* file, int line, const char* fmt, ...) +{ + + Logger::LogLevel vespaLogLevel = toVespaLogLevel(level); + + if (logger.wants(vespaLogLevel)) { + const size_t maxSize(0x8000); + boost::scoped_array<char> payload(new char[maxSize]); + + va_list args; + va_start(args, fmt); + vsnprintf(payload.get(), maxSize, fmt, args); + va_end(args); + + logger.doLog(vespaLogLevel, file, line, payload.get()); + } +} diff --git a/filedistribution/src/vespa/filedistribution/distributor/CMakeLists.txt b/filedistribution/src/vespa/filedistribution/distributor/CMakeLists.txt new file mode 100644 index 00000000000..1e1baf01e61 --- /dev/null +++ b/filedistribution/src/vespa/filedistribution/distributor/CMakeLists.txt @@ -0,0 +1,15 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_library(filedistribution_distributor STATIC + SOURCES + filedistributortrackerimpl.cpp + filedownloader.cpp + filedownloadermanager.cpp + hostname.cpp + scheduler.cpp + signalhandling.cpp + state_server_impl.cpp + DEPENDS +) +target_compile_options(filedistribution_distributor PRIVATE -DTORRENT_DEBUG -DTORRENT_DISABLE_ENCRYPTION -DTORRENT_DISABLE_DHT -DWITH_SHIPPED_GEOIP_H -DBOOST_ASIO_HASH_MAP_BUCKETS=1021 -DBOOST_EXCEPTION_DISABLE -DBOOST_ASIO_ENABLE_CANCELIO -DBOOST_ASIO_DYN_LINK -DTORRENT_LINKING_SHARED) +vespa_generate_config(filedistribution_distributor filedistributor.def) +install(FILES filedistributor.def DESTINATION var/db/vespa/config_server/serverdb/classes) diff --git a/filedistribution/src/vespa/filedistribution/distributor/filedistributor.def b/filedistribution/src/vespa/filedistribution/distributor/filedistributor.def new file mode 100644 index 00000000000..654a98de5c4 --- /dev/null +++ b/filedistribution/src/vespa/filedistribution/distributor/filedistributor.def @@ -0,0 +1,10 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +namespace=cloud.config.filedistribution + +torrentport int +stateport int default = 0 +hostname string +filedbpath string + +maxdownloadspeed double +maxuploadspeed double diff --git a/filedistribution/src/vespa/filedistribution/distributor/filedistributortrackerimpl.cpp b/filedistribution/src/vespa/filedistribution/distributor/filedistributortrackerimpl.cpp new file mode 100644 index 00000000000..4b6fbc96cb3 --- /dev/null +++ b/filedistribution/src/vespa/filedistribution/distributor/filedistributortrackerimpl.cpp @@ -0,0 +1,203 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include "filedistributortrackerimpl.h" + +#include <cmath> + +#include <libtorrent/tracker_manager.hpp> +#include <libtorrent/torrent.hpp> + +#include <vespa/filedistribution/model/filedistributionmodel.h> +#include "filedownloader.h" +#include "hostname.h" + +#include <vespa/log/log.h> +LOG_SETUP(".filedistributiontrackerimpl"); + +using filedistribution::FileDistributorTrackerImpl; +using filedistribution::FileDownloader; +using filedistribution::FileDistributionModel; +using filedistribution::Scheduler; +using filedistribution::ExceptionRethrower; + +typedef FileDistributionModel::PeerEntries PeerEntries; + +namespace asio = boost::asio; + +namespace { + +void +filterSelf(FileDistributionModel::PeerEntries& peers, + const std::string& hostName, + int port) +{ + FileDistributionModel::PeerEntries::iterator + i = peers.begin(), + currEnd = peers.end(); + + while ( i != currEnd ) { + //hostName is currently used in the ip field + if (i->ip == hostName && i->port == port) { + --currEnd; + std::swap(*i, *currEnd); + } else { + ++i; + } + } + + peers.erase(currEnd, peers.end()); +} + +void resolveIPAddresses(PeerEntries& peers) { + for (auto& p: peers) { + try { + p.ip = filedistribution::lookupIPAddress(p.ip); + } catch (filedistribution::FailedResolvingHostName& e) { + LOG(info, "Failed resolving address %s", p.ip.c_str()); + } + } +} + +struct TrackingTask : public Scheduler::Task { + int _numTimesRescheduled; + + libtorrent::tracker_request _trackerRequest; + boost::weak_ptr<libtorrent::torrent> _torrent; + boost::weak_ptr<FileDownloader> _downloader; + boost::shared_ptr<FileDistributionModel> _model; + + TrackingTask(Scheduler& scheduler, + const libtorrent::tracker_request& trackerRequest, + const boost::shared_ptr<libtorrent::torrent>& torrent, + const boost::weak_ptr<FileDownloader>& downloader, + const boost::shared_ptr<FileDistributionModel>& model) + : Task(scheduler), + _numTimesRescheduled(0), + _trackerRequest(trackerRequest), + _torrent(torrent), + _downloader(downloader), + _model(model) + {} + + //TODO: refactor + void doHandle() { + if (boost::shared_ptr<FileDownloader> downloader = _downloader.lock()) { + //All torrents must be destructed before the session is destructed. + //It's okay to prevent the torrent from expiring here + //since the session can't be destructed while + //we hold a shared_ptr to the downloader. + if (boost::shared_ptr<libtorrent::torrent> torrent = _torrent.lock()) { + PeerEntries peers = getPeers(downloader); + + if (!peers.empty()) { + torrent->session().m_io_service.dispatch( + [torrent_weak_ptr = _torrent, trackerRequest = _trackerRequest, peers = peers]() mutable { + if (auto torrent_sp = torrent_weak_ptr.lock()) { + torrent_sp->tracker_response( + trackerRequest, + libtorrent::address(), + std::list<libtorrent::address>(), + peers, + -1, -1, -1, -1, -1, + libtorrent::address(), "trackerid"); + } + }); + } + + if (peers.size() < 5) { + reschedule(); + } + } + } + } + + PeerEntries getPeers(const boost::shared_ptr<FileDownloader>& downloader) { + std::string fileReference = downloader->infoHash2FileReference(_trackerRequest.info_hash); + + const size_t recommendedMaxNumberOfPeers = 30; + PeerEntries peers = _model->getPeers(fileReference, recommendedMaxNumberOfPeers); + + //currently, libtorrent stops working if it tries to connect to itself. + filterSelf(peers, downloader->_hostName, downloader->_port); + resolveIPAddresses(peers); + for (const auto& peer: peers) { + LOG(debug, "Returning peer with ip %s", peer.ip.c_str()); + } + + return peers; + } + + void reschedule() { + if (_numTimesRescheduled < 5) { + double fudgeFactor = 0.1; + schedule(boost::posix_time::seconds(static_cast<int>( + std::pow(3., _numTimesRescheduled) + fudgeFactor))); + _numTimesRescheduled++; + } + } +}; + + +void +workerFunction(boost::shared_ptr<ExceptionRethrower> exceptionRethrower, asio::io_service& ioService) +{ + while (!boost::this_thread::interruption_requested()) { + try { + //No reset needed after handling exceptions. + ioService.run(); + } catch(const boost::thread_interrupted&) { + LOG(debug, "Tracker worker thread interrupted."); + throw; + } catch(...) { + exceptionRethrower->store(boost::current_exception()); + } + } +} + +} //anonymous namespace + + +FileDistributorTrackerImpl::FileDistributorTrackerImpl( + const boost::shared_ptr<FileDistributionModel>& model, + const boost::shared_ptr<ExceptionRethrower>& exceptionRethrower) + :_exceptionRethrower(exceptionRethrower), + _model(model) +{} + + +FileDistributorTrackerImpl::~FileDistributorTrackerImpl() { + LOG(debug, "Deconstructing FileDistributorTrackerImpl"); + + LockGuard guard(_mutex); + _scheduler.reset(); +} + + +void +FileDistributorTrackerImpl::trackingRequest( + libtorrent::tracker_request& request, + const boost::shared_ptr<libtorrent::torrent> & torrent) +{ + LockGuard guard(_mutex); + + if (torrent != boost::shared_ptr<libtorrent::torrent>()) { + boost::shared_ptr<TrackingTask> trackingTask(new TrackingTask( + *_scheduler.get(), request, torrent, _downloader, _model)); + + trackingTask->scheduleNow(); + } +} + + +void +FileDistributorTrackerImpl::setDownloader(const boost::shared_ptr<FileDownloader>& downloader) +{ + LockGuard guard(_mutex); + + _scheduler.reset(); + _downloader = downloader; + + if (downloader) { + _scheduler.reset(new Scheduler(boost::bind(&workerFunction, _exceptionRethrower, _1))); + } +} diff --git a/filedistribution/src/vespa/filedistribution/distributor/filedistributortrackerimpl.h b/filedistribution/src/vespa/filedistribution/distributor/filedistributortrackerimpl.h new file mode 100644 index 00000000000..edbdb9b8943 --- /dev/null +++ b/filedistribution/src/vespa/filedistribution/distributor/filedistributortrackerimpl.h @@ -0,0 +1,45 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <libtorrent/session.hpp> +#include <libtorrent/torrent.hpp> + +#include <boost/thread.hpp> +#include <boost/shared_ptr.hpp> +#include <boost/asio/io_service.hpp> +#include <boost/asio/deadline_timer.hpp> + +#include <vespa/filedistribution/model/filedistributionmodel.h> +#include <vespa/filedistribution/common/exceptionrethrower.h> +#include "scheduler.h" + +namespace filedistribution { +class FileDistributionModel; +class FileDownloader; + +class FileDistributorTrackerImpl : public FileDistributionTracker { + const boost::shared_ptr<ExceptionRethrower> _exceptionRethrower; + const boost::shared_ptr<FileDistributionModel> _model; + + typedef boost::lock_guard<boost::mutex> LockGuard; + boost::mutex _mutex; + boost::weak_ptr<FileDownloader> _downloader; + + //Use separate worker thread to avoid potential deadlock + //between tracker requests and files to download changed requests. + boost::scoped_ptr<Scheduler> _scheduler; +public: + FileDistributorTrackerImpl(const boost::shared_ptr<FileDistributionModel>& model, + const boost::shared_ptr<ExceptionRethrower>& exceptionRethrower); + + virtual ~FileDistributorTrackerImpl(); + + //overrides + void trackingRequest(libtorrent::tracker_request& request, + const boost::shared_ptr<libtorrent::torrent> & torrent); + + void setDownloader(const boost::shared_ptr<FileDownloader>& downloader); +}; + +} //namespace filedistribution + diff --git a/filedistribution/src/vespa/filedistribution/distributor/filedownloader.cpp b/filedistribution/src/vespa/filedistribution/distributor/filedownloader.cpp new file mode 100644 index 00000000000..57fae9df6ee --- /dev/null +++ b/filedistribution/src/vespa/filedistribution/distributor/filedownloader.cpp @@ -0,0 +1,437 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include "filedownloader.h" +#include "hostname.h" + +#include <iterator> +#include <algorithm> + +#include <boost/filesystem.hpp> +#include <boost/filesystem/fstream.hpp> +#include <boost/filesystem/convenience.hpp> +#include <boost/lambda/lambda.hpp> +#include <boost/lambda/bind.hpp> +#include <boost/function_output_iterator.hpp> +#include <boost/foreach.hpp> +#include <boost/thread.hpp> + +#include <libtorrent/alert.hpp> +#include <libtorrent/alert_types.hpp> +#include <libtorrent/torrent_handle.hpp> +#include <libtorrent/bencode.hpp> + +#include <vespa/log/log.h> +LOG_SETUP(".filedownloader"); + +using filedistribution::FileDownloader; +namespace fs = boost::filesystem; + +using libtorrent::sha1_hash; +using libtorrent::torrent_handle; + +namespace { +const std::string resumeDataSuffix = ".resume"; +const std::string resumeDataSuffixTemp = ".resumetemp"; +const std::string newFileSuffix = ".new"; + +//TODO: temporarily duplicated from filedistributionmanager +std::string +fileReferenceToString(const libtorrent::sha1_hash& fileReference) { + std::ostringstream fileReferenceString; + fileReferenceString <<fileReference; + + assert (fileReferenceString.str().size() == 40); + return fileReferenceString.str(); +} + +libtorrent::sha1_hash +toInfoHash(const std::string& fileReference) { + assert (fileReference.size() == 40); + std::istringstream s(fileReference); + + sha1_hash infoHash; + s >> infoHash; + return infoHash; +} + +void +addNewFile(const fs::path& dbPath, const fs::path& newFile) { + LOG(debug, "Adding new file: '%s'.", newFile.string().c_str()); + const fs::path destination = dbPath / newFile.stem(); + + if ( fs::exists(destination) ) + fs::remove_all(destination); + + fs::path resumeData = destination.string() + resumeDataSuffix; + if ( fs::exists(resumeData) ) + fs::remove(resumeData); + + fs::rename(newFile, destination); +} + +void +addNewDbFiles(const fs::path& dbPath) { + for (fs::directory_iterator i(dbPath), end; i != end; ++i) { + if (newFileSuffix == fs::extension(*i)) + addNewFile(dbPath, *i); + } +} + +fs::path +resumeDataPath(const libtorrent::torrent_handle& torrent) { +#pragma GCC diagnostic ignored "-Wdeprecated-declarations" + fs::path tmp(torrent.save_path()); +#pragma GCC diagnostic pop + return tmp.string() + resumeDataSuffix; +} + +fs::path +resumeDataPathTemp(const libtorrent::torrent_handle& torrent) { +#pragma GCC diagnostic ignored "-Wdeprecated-declarations" + fs::path tmp(torrent.save_path()); +#pragma GCC diagnostic pop + return tmp.string() + resumeDataSuffixTemp; +} + +fs::path +getMainFile(const libtorrent::torrent_handle& handle) { + const libtorrent::torrent_info fallback(handle.info_hash()); + auto p = handle.torrent_file(); + const libtorrent::torrent_info& info = (p ? *p : fallback); + return info.files().num_files() == 1 ? + info.file_at(0).path : + info.name(); +} + +std::string +getMainName(const libtorrent::torrent_handle& handle) { + const libtorrent::torrent_info fallback(handle.info_hash()); + auto p = handle.torrent_file(); + const libtorrent::torrent_info& info = (p ? *p : fallback); + return info.name(); +} + +libtorrent::session_settings +createSessionSettings() { + libtorrent::session_settings s; + + const int unlimited = -1; + s.active_downloads = s.active_seeds = s.active_limit = unlimited; + + s.min_reconnect_time = 1; //seconds + s.min_announce_interval = 5 * 60; //seconds + return s; +} + +} //anonymous namespace + +struct FileDownloader::EventHandler +{ + FileDownloader& _fileDownloader; + + EventHandler(FileDownloader* fileDownloader) + : _fileDownloader(*fileDownloader) + {} + + void defaultHandler(const libtorrent::alert& alert) const { + LOG(debug, "alert %s: %s", alert.what(), alert.message().c_str()); + } + + void operator()(const libtorrent::listen_failed_alert& alert) const { + BOOST_THROW_EXCEPTION(std::runtime_error(alert.message())); + } + void operator()(const libtorrent::fastresume_rejected_alert& alert) const { + LOG(info, "alert %s: %s", alert.what(), alert.message().c_str()); + } + void operator()(const libtorrent::torrent_delete_failed_alert& alert) const { + LOG(warning, "alert %s: %s", alert.what(), alert.message().c_str()); + } + void operator()(const libtorrent::file_error_alert& alert) const { + LOG(error, "alert %s: %s", alert.what(), alert.message().c_str()); + } + + void operator()(const libtorrent::torrent_finished_alert& alert) const { + defaultHandler(alert); + + std::string fileReference = fileReferenceToString(alert.handle.info_hash()); + + LOG(debug, "File '%s' with file reference '%s' downloaded successfully.", + getMainName(alert.handle).c_str(), + fileReference.c_str()); + + _fileDownloader.signalIfFinishedDownloading(fileReference); + alert.handle.save_resume_data(); + _fileDownloader.didRequestSRD(); + } + + void operator()(const libtorrent::save_resume_data_failed_alert& alert) const { + LOG(warning, "save resume data failed: %s -- %s", + alert.what(), alert.message().c_str()); + _fileDownloader.didReceiveSRD(); + } + + void operator()(const libtorrent::save_resume_data_alert& alert) const { + defaultHandler(alert); + + fs::ofstream resumeFile(resumeDataPathTemp(alert.handle), + std::ios_base::binary); + resumeFile.unsetf(std::ios_base::skipws); + libtorrent::bencode(std::ostream_iterator<char>(resumeFile), + *alert.resume_data); + resumeFile.close(); + fs::rename(resumeDataPathTemp(alert.handle), resumeDataPath(alert.handle)); + _fileDownloader.didReceiveSRD(); + } + + void handle(std::unique_ptr<libtorrent::alert> alert) { + try { + libtorrent::handle_alert< + libtorrent::torrent_finished_alert, + libtorrent::save_resume_data_alert, + libtorrent::save_resume_data_failed_alert, + libtorrent::listen_failed_alert, + libtorrent::file_error_alert, + libtorrent::fastresume_rejected_alert, + libtorrent::torrent_delete_failed_alert> + dispatch(alert, *this); + } catch (libtorrent::unhandled_alert& e) { + LOG(debug, "alert (ignored): %s -- %s", + alert->what(), alert->message().c_str()); + } + } +}; + +FileDownloader::LogSessionDeconstructed::~LogSessionDeconstructed() +{ + LOG(debug, "Libtorrent session closed successfully."); +} + +FileDownloader::FileDownloader(const boost::shared_ptr<FileDistributionTracker>& tracker, + const std::string& hostName, int port, + const fs::path& dbPath, + const boost::shared_ptr<ExceptionRethrower>& exceptionRethrower) + : _outstanding_SRD_requests(0), + _tracker(tracker), + _session(tracker.get(), libtorrent::fingerprint("vp", 0, 0, 0, 0), 0), + _dbPath(dbPath), + _exceptionRethrower(exceptionRethrower), + _hostName(hostName), + _port(port) +{ + if (!fs::exists(_dbPath)) + fs::create_directories(_dbPath); + addNewDbFiles(_dbPath); + + _session.set_settings(createSessionSettings()); + listen(); + _session.set_alert_mask( + libtorrent::alert::error_notification | + libtorrent::alert::status_notification); + +} + +FileDownloader::~FileDownloader() { + EventHandler eventHandler(this); + size_t cnt = 0; + do { + LOG(debug, "destructor waiting for %zu SRD alerts", _outstanding_SRD_requests); + while (_session.wait_for_alert(libtorrent::milliseconds(20))) { + std::unique_ptr<libtorrent::alert> alert = _session.pop_alert(); + eventHandler.handle(std::move(alert)); + ++cnt; + } + } while (_outstanding_SRD_requests > 0); + LOG(debug, "handled %zu alerts in destructor", cnt); +} + +void +FileDownloader::listen() { + for (int retries = 0; retries < 5; ++retries) { + boost::system::error_code ec; + _session.listen_on(std::make_pair(_port, _port), ec); // (min, max) + //If libtorrent fails listening on the specified port, + //it will automatically try to use port 0. + if (!ec && _session.listen_port() == _port) + return; + perror("Listen failed"); + LOG(debug, "Failed listening on '%d' message='%s'", _port, ec.message().c_str()); + boost::this_thread::sleep(boost::posix_time::milliseconds(500)); + } + + BOOST_THROW_EXCEPTION(FailedListeningException(_hostName, _port)); +} + +boost::optional< fs::path > +FileDownloader::pathToCompletedFile(const std::string& fileReference) const { + libtorrent::torrent_handle torrent = _session.find_torrent(toInfoHash(fileReference)); + +#pragma GCC diagnostic ignored "-Wdeprecated-declarations" + if (torrent.is_valid() && torrent.is_finished()) { +#pragma GCC diagnostic pop + return _dbPath / fileReference / getMainFile(torrent); + } else { + return boost::optional< fs::path>(); + } +} + + +boost::optional<FileDownloader::ResumeDataBuffer> +FileDownloader::getResumeData(const std::string& fileReference) { + LOG(debug, ("Reading resume data for " + fileReference).c_str()); + try { + fs::path path = (_dbPath / fileReference).string() + resumeDataSuffix; + if (fs::exists(path)) { + fs::ifstream file(path, std::ios::binary); + ResumeDataBuffer result; + + std::istream_iterator<char> iterator(file), end; + std::copy(iterator, end, std::back_inserter(result)); + LOG(debug, ("Successfully retrieved resume data for " + fileReference).c_str()); + if (result.size() < 50) { + LOG(info, "Very small resume file %zu bytes.", result.size()); + } + + return result; + } + } catch(...) { + //resume data is only an optimization + LOG(info, ("Error while reading resume data for " + fileReference).c_str()); + } + return boost::optional<ResumeDataBuffer>(); +} + + +bool +FileDownloader::hasTorrent(const std::string& fileReference) const { + return _session.find_torrent(toInfoHash(fileReference)).is_valid(); +} + +void +FileDownloader::addTorrent(const std::string& fileReference, const Buffer& buffer) { + LockGuard guard(_modifyTorrentsDownloadingMutex); + + boost::optional<ResumeDataBuffer> resumeData = getResumeData(fileReference); + + if (_session.find_torrent( (toInfoHash(fileReference))).is_valid()) + return; + + libtorrent::lazy_entry entry; +#pragma GCC diagnostic ignored "-Wdeprecated-declarations" + libtorrent::lazy_bdecode(&*buffer.begin(), &*buffer.end(), + entry); //out +#pragma GCC diagnostic pop + + libtorrent::add_torrent_params torrentParams; + torrentParams.save_path = (_dbPath / fileReference).string(); + torrentParams.ti = new libtorrent::torrent_info(entry); + + torrentParams.auto_managed = false; + torrentParams.paused = false; + + if (resumeData) + torrentParams.resume_data = *resumeData; //vector will be swapped + + libtorrent::torrent_handle torrentHandle = _session.add_torrent(torrentParams); + + LOG(debug, "Started downloading file '%s' with file reference '%s'.", + getMainName(torrentHandle).c_str(), fileReference.c_str()); +} + + +void +FileDownloader::deleteTorrentData(const libtorrent::torrent_handle& torrent, LockGuard&) { + if (torrent.is_valid()) { + fs::path resumeFilePath = resumeDataPath(torrent); + +#pragma GCC diagnostic ignored "-Wdeprecated-declarations" + fs::path savePath = torrent.save_path(); +#pragma GCC diagnostic pop + + //the files might not exist, so ignore return value + fs::remove_all(savePath); + fs::remove(resumeFilePath); + } + + _downloadFailed(fileReferenceToString(torrent.info_hash()), FileProvider::FileReferenceRemoved); +} + +void +FileDownloader::removeAllTorrentsBut(const std::set<std::string> & filesToRetain) { + LockGuard guard(_modifyTorrentsDownloadingMutex); + + std::set<std::string> currentFiles; + namespace ll = boost::lambda; + + std::set<sha1_hash> infoHashesToRetain; + BOOST_FOREACH(const std::string& fileReference, filesToRetain) { + infoHashesToRetain.insert(toInfoHash(fileReference)); + } + + std::vector<torrent_handle> torrents = _session.get_torrents(); + + BOOST_FOREACH(torrent_handle torrent, torrents) { + if (!infoHashesToRetain.count(torrent.info_hash())) { + LOG(info, "Removing torrent: '%s' with file reference '%s'", + getMainName(torrent).c_str(), + fileReferenceToString(torrent.info_hash()).c_str()); + + deleteTorrentData(torrent, guard); + _session.remove_torrent(torrent); + } + } +} + + +void FileDownloader::runEventLoop() { + EventHandler eventHandler(this); + try { + while (!boost::this_thread::interruption_requested()) { + if (_session.wait_for_alert(libtorrent::milliseconds(100))) { + std::unique_ptr<libtorrent::alert> alert = _session.pop_alert(); + eventHandler.handle(std::move(alert)); + } + } + } catch(const boost::thread_interrupted&) { + LOG(spam, "The FileDownloader thread was interrupted."); + } catch(...) { + _exceptionRethrower->store(boost::current_exception()); + } +} + +void +FileDownloader::signalIfFinishedDownloading(const std::string& fileReference) { + boost::optional<fs::path> path = pathToCompletedFile(fileReference); + + if (path) { + _downloadCompleted(fileReference, *path); + } +} + +std::string +FileDownloader::infoHash2FileReference(const libtorrent::sha1_hash& infoHash) { + //TODO + return fileReferenceToString(infoHash); +} + +namespace { +int +toBytesPerSec(double MBPerSec) { + return static_cast<int>(MBPerSec * 1024 * 1024); +} +} + +void +FileDownloader::setMaxDownloadSpeed(double MBPerSec) { + LOG(config, "Setting max download speed to %f MB/sec", MBPerSec); +#pragma GCC diagnostic ignored "-Wdeprecated-declarations" + _session.set_download_rate_limit(toBytesPerSec(MBPerSec)); +#pragma GCC diagnostic pop +} + +void +FileDownloader::setMaxUploadSpeed(double MBPerSec) { + LOG(config, "Setting max upload speed to %f MB/sec", MBPerSec); +#pragma GCC diagnostic ignored "-Wdeprecated-declarations" + _session.set_upload_rate_limit(toBytesPerSec(MBPerSec)); +#pragma GCC diagnostic pop +} diff --git a/filedistribution/src/vespa/filedistribution/distributor/filedownloader.h b/filedistribution/src/vespa/filedistribution/distributor/filedownloader.h new file mode 100644 index 00000000000..467d1c8f2c8 --- /dev/null +++ b/filedistribution/src/vespa/filedistribution/distributor/filedownloader.h @@ -0,0 +1,94 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vector> +#include <boost/thread/mutex.hpp> +#include <boost/filesystem/path.hpp> +#include <boost/optional.hpp> +#include <boost/multi_index_container.hpp> +#include <boost/multi_index/indexed_by.hpp> +#include <boost/multi_index/member.hpp> +#include <boost/multi_index/ordered_index.hpp> + +#include <libtorrent/session.hpp> + +#include <vespa/filedistribution/rpc/fileprovider.h> +#include "hostname.h" +#include <vespa/filedistribution/common/buffer.h> +#include <vespa/filedistribution/common/exceptionrethrower.h> +#include <vespa/filedistribution/common/exception.h> + +namespace filedistribution { + +struct NoSuchTorrentException : public Exception {}; + +struct FailedListeningException : public Exception { + FailedListeningException(const std::string& hostName, int port) { + *this <<errorinfo::HostName(hostName) <<errorinfo::Port(port); + } +}; + +class FileDownloader +{ + struct EventHandler; + struct LogSessionDeconstructed { + ~LogSessionDeconstructed(); + }; + + size_t _outstanding_SRD_requests; + boost::shared_ptr<FileDistributionTracker> _tracker; + + boost::mutex _modifyTorrentsDownloadingMutex; + typedef boost::lock_guard<boost::mutex> LockGuard; + + LogSessionDeconstructed _logSessionDeconstructed; + //session is safe to use from multiple threads. + libtorrent::session _session; + + const boost::filesystem::path _dbPath; + typedef std::vector<char> ResumeDataBuffer; + boost::optional<ResumeDataBuffer> getResumeData(const std::string& fileReference); + + class RemoveTorrent; + + void deleteTorrentData(const libtorrent::torrent_handle& torrent, LockGuard&); + void listen(); +public: + // accounting of save-resume-data requests: + void didRequestSRD() { ++_outstanding_SRD_requests; } + void didReceiveSRD() { --_outstanding_SRD_requests; } + + typedef FileProvider::DownloadCompletedSignal DownloadCompletedSignal; + typedef FileProvider::DownloadFailedSignal DownloadFailedSignal; + + FileDownloader(const boost::shared_ptr<FileDistributionTracker>& tracker, + const std::string& hostName, int port, + const boost::filesystem::path& dbPath, + const boost::shared_ptr<ExceptionRethrower>& exceptionRethrower); + ~FileDownloader(); + + void runEventLoop(); + void addTorrent(const std::string& fileReference, const Buffer& buffer); + bool hasTorrent(const std::string& fileReference) const; + boost::optional<boost::filesystem::path> pathToCompletedFile(const std::string& fileReference) const; + void removeAllTorrentsBut(const std::set<std::string> & filesToRetain); + + void signalIfFinishedDownloading(const std::string& fileReference); + + std::string infoHash2FileReference(const libtorrent::sha1_hash& hash); + void setMaxDownloadSpeed(double MBPerSec); + void setMaxUploadSpeed(double MBPerSec); + + const boost::shared_ptr<ExceptionRethrower> _exceptionRethrower; + + const std::string _hostName; + const int _port; + + //signals + DownloadCompletedSignal _downloadCompleted; + DownloadFailedSignal _downloadFailed; //removed or error +}; + +} //namespace filedistribution + + diff --git a/filedistribution/src/vespa/filedistribution/distributor/filedownloadermanager.cpp b/filedistribution/src/vespa/filedistribution/distributor/filedownloadermanager.cpp new file mode 100644 index 00000000000..af66ed86afa --- /dev/null +++ b/filedistribution/src/vespa/filedistribution/distributor/filedownloadermanager.cpp @@ -0,0 +1,148 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +LOG_SETUP(".filedownloadermanager"); + +#include "filedownloadermanager.h" + +#include <iterator> +#include <sstream> +#include <boost/lambda/lambda.hpp> +#include <boost/lambda/bind.hpp> +#include <boost/thread.hpp> + +using filedistribution::FileDownloaderManager; + +namespace lambda = boost::lambda; + +namespace { +void logStartDownload(const std::set<std::string> & filesToDownload) { + std::ostringstream msg; + msg <<"StartDownloads:" <<std::endl; + std::copy(filesToDownload.begin(), filesToDownload.end(), + std::ostream_iterator<std::string>(msg, "\n")); + LOG(debug, msg.str().c_str()); +} +} //anonymous namespace + +FileDownloaderManager::FileDownloaderManager( + const boost::shared_ptr<FileDownloader>& downloader, + const boost::shared_ptr<FileDistributionModel>& model) + + :_fileDownloader(downloader), + _fileDistributionModel(model), + _startDownloads(this), + _setFinishedDownloadingStatus(this) +{} + +FileDownloaderManager::~FileDownloaderManager() { + LOG(debug, "Deconstructing FileDownloaderManager"); +} + +void +FileDownloaderManager::start() +{ + _downloadFailedConnection = + downloadFailed().connect( + DownloadFailedSignal::slot_type(lambda::bind(&FileDownloaderManager::removePeerStatus, this, lambda::_1)). + track(shared_from_this())); + + _downloadCompletedConnection = + downloadCompleted().connect( + DownloadCompletedSignal::slot_type(_setFinishedDownloadingStatus). + track(shared_from_this())); + + _filesToDownloadChangedConnection = + _fileDistributionModel->_filesToDownloadChanged.connect( + FileDistributionModel::FilesToDownloadChangedSignal::slot_type(boost::ref(_startDownloads)). + track(shared_from_this())); +} + +boost::optional< boost::filesystem::path > +FileDownloaderManager::getPath(const std::string& fileReference) { + return _fileDownloader->pathToCompletedFile(fileReference); +} + +void +FileDownloaderManager::downloadFile(const std::string& fileReference) { + { + LockGuard updateFilesToDownloadGuard(_updateFilesToDownloadMutex); + _startDownloads.downloadFile(fileReference); + } + + //if the file is already downloading but not completed before the above call, + //the finished download callback might come before the interested party + //has called connectFinishedDownloadingHandler. + //An explicit call is therefore used to mitigate this problem: + //Do not hold updateFilesToDownloadMutex when calling this, as it might cause deadlock. + _fileDownloader->signalIfFinishedDownloading(fileReference); +} + +void +FileDownloaderManager::removePeerStatus(const std::string& fileReference) { + //TODO: Simplify by using separate thread for removal: + //currently called via StartDownloads which already holds a lock on updateFilesToDownloadMutex. + + _fileDistributionModel->removePeer(fileReference); +} + +void +FileDownloaderManager::StartDownloads::downloadFile(const std::string& fileReference) { + if (!_parent._fileDownloader->hasTorrent(fileReference)) { + Buffer torrent( + _parent._fileDistributionModel->getFileDBModel().getFile(fileReference)); + + _parent._fileDistributionModel->addPeer(fileReference); + _parent._fileDownloader->addTorrent(fileReference, torrent); + } +} + + +void +FileDownloaderManager::StartDownloads::operator()() { + namespace ll = boost::lambda; + + LockGuard updateFilesToDownloadGuard(_parent._updateFilesToDownloadMutex); + + std::set<std::string> filesToDownload = + _parent._fileDistributionModel->getFilesToDownload(); + logStartDownload(filesToDownload); + + std::for_each(filesToDownload.begin(), filesToDownload.end(), + ll::bind(&StartDownloads::downloadFile, this, ll::_1)); + + _parent._fileDownloader->removeAllTorrentsBut(filesToDownload); +} + +FileDownloaderManager::StartDownloads::StartDownloads(FileDownloaderManager* parent) + :_parent(*parent) +{} + + +FileDownloaderManager::SetFinishedDownloadingStatus::SetFinishedDownloadingStatus( + FileDownloaderManager* parent) + :_parent(*parent) +{} + +void +FileDownloaderManager::SetFinishedDownloadingStatus::operator()( + const std::string& fileReference, const boost::filesystem::path&) { + + //Prevent concurrent modifications to peer node in zk. + LockGuard updateFilesToDownloadGuard(_parent._updateFilesToDownloadMutex); + + try { + _parent._fileDistributionModel->peerFinished(fileReference); + } catch(const FileDistributionModel::NotPeer&) { //Probably a concurrent removal of the torrent. + + //improve chance of libtorrent session being updated. + boost::this_thread::sleep(boost::posix_time::milliseconds(100)); + if (_parent._fileDownloader->hasTorrent(fileReference)) { + + _parent._fileDistributionModel->addPeer(fileReference); + _parent._fileDistributionModel->peerFinished(fileReference); + } else { + LOG(debug, "OK: Torrent '%s' finished concurrently with its removal.", fileReference.c_str()); + } + } +} diff --git a/filedistribution/src/vespa/filedistribution/distributor/filedownloadermanager.h b/filedistribution/src/vespa/filedistribution/distributor/filedownloadermanager.h new file mode 100644 index 00000000000..edebe5a8423 --- /dev/null +++ b/filedistribution/src/vespa/filedistribution/distributor/filedownloadermanager.h @@ -0,0 +1,68 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <boost/noncopyable.hpp> +#include <boost/thread/mutex.hpp> +#include <boost/signals2/signal.hpp> +#include <boost/enable_shared_from_this.hpp> + +#include <vespa/filedistribution/rpc/fileprovider.h> +#include <vespa/filedistribution/model/filedistributionmodel.h> +#include "filedownloader.h" + +namespace filedistribution { + +class FileDownloaderManager : public FileProvider, + boost::noncopyable, + public boost::enable_shared_from_this<FileDownloaderManager> { + + class StartDownloads { + FileDownloaderManager& _parent; + public: + void operator()(); + void downloadFile(const std::string& fileReference); + StartDownloads(FileDownloaderManager* parent); + }; + + class SetFinishedDownloadingStatus { + FileDownloaderManager& _parent; + public: + void operator()(const std::string& fileReference, const boost::filesystem::path&); + SetFinishedDownloadingStatus(FileDownloaderManager*); + }; + + typedef boost::lock_guard<boost::mutex> LockGuard; + boost::mutex _updateFilesToDownloadMutex; + + boost::shared_ptr<FileDownloader> _fileDownloader; + boost::shared_ptr<FileDistributionModel> _fileDistributionModel; + StartDownloads _startDownloads; + SetFinishedDownloadingStatus _setFinishedDownloadingStatus; + + boost::signals2::scoped_connection _downloadFailedConnection; + boost::signals2::scoped_connection _downloadCompletedConnection; + boost::signals2::scoped_connection _filesToDownloadChangedConnection; + + void removePeerStatus(const std::string& fileReference); +public: + FileDownloaderManager(const boost::shared_ptr<FileDownloader>&, + const boost::shared_ptr<FileDistributionModel>& model); + ~FileDownloaderManager(); + void start(); + + boost::optional<boost::filesystem::path> getPath(const std::string& fileReference); + void downloadFile(const std::string& fileReference); + + //FileProvider overrides + DownloadCompletedSignal& downloadCompleted() { + return _fileDownloader->_downloadCompleted; + } + + DownloadFailedSignal& downloadFailed() { + return _fileDownloader->_downloadFailed; + } +}; + +} //namespace filedistribution + + diff --git a/filedistribution/src/vespa/filedistribution/distributor/hostname.cpp b/filedistribution/src/vespa/filedistribution/distributor/hostname.cpp new file mode 100644 index 00000000000..acd5c982957 --- /dev/null +++ b/filedistribution/src/vespa/filedistribution/distributor/hostname.cpp @@ -0,0 +1,22 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "hostname.h" + +#include <boost/asio.hpp> + +#include <vespa/log/log.h> +LOG_SETUP(".hostname"); +#include <vespa/vespalib/net/socket_address.h> + +namespace asio = boost::asio; + +std::string +filedistribution::lookupIPAddress(const std::string& hostName) +{ + auto best_addr = vespalib::SocketAddress::select_remote(0, hostName.c_str()); + if (!best_addr.valid()) { + BOOST_THROW_EXCEPTION(filedistribution::FailedResolvingHostName(hostName)); + } + const std::string address = best_addr.ip_address(); + LOG(debug, "Resolved hostname'%s' as '%s'", hostName.c_str(), address.c_str()); + return address; +} diff --git a/filedistribution/src/vespa/filedistribution/distributor/hostname.h b/filedistribution/src/vespa/filedistribution/distributor/hostname.h new file mode 100644 index 00000000000..12732029c92 --- /dev/null +++ b/filedistribution/src/vespa/filedistribution/distributor/hostname.h @@ -0,0 +1,22 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <string> +#include <vespa/filedistribution/common/exception.h> + +namespace filedistribution { + +namespace errorinfo { +typedef boost::error_info<struct tag_HostName, std::string> HostName; +typedef boost::error_info<struct tag_Port, int> Port; +}; + + +std::string lookupIPAddress(const std::string& hostName); + +struct FailedResolvingHostName : public Exception { + FailedResolvingHostName(const std::string& hostName) { + *this <<errorinfo::HostName(hostName); + } +}; +} diff --git a/filedistribution/src/vespa/filedistribution/distributor/scheduler.cpp b/filedistribution/src/vespa/filedistribution/distributor/scheduler.cpp new file mode 100644 index 00000000000..1ae0b0b1f95 --- /dev/null +++ b/filedistribution/src/vespa/filedistribution/distributor/scheduler.cpp @@ -0,0 +1,49 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include "scheduler.h" + +#include <boost/bind.hpp> + +#include <iostream> + +namespace asio = boost::asio; + +using filedistribution::Scheduler; +typedef Scheduler::Task Task; + +Task::Task(Scheduler& scheduler) + : _timer(scheduler.ioService) +{} + +void +Task::schedule(asio::deadline_timer::duration_type delay) +{ + _timer.expires_from_now(delay); + _timer.async_wait(boost::bind(&Task::handle, shared_from_this(), _1)); +} + +void +Task::scheduleNow() +{ + schedule(boost::posix_time::seconds(0)); +} + +void +Task::handle(const boost::system::error_code& code) { + if (code != asio::error::operation_aborted) { + doHandle(); + } +} + + +Scheduler::Scheduler(boost::function<void (asio::io_service&)> callRun) + :_keepAliveWork(ioService), + _workerThread(boost::bind(callRun, boost::ref(ioService))) +{} + +Scheduler::~Scheduler() { + ioService.stop(); + _workerThread.interrupt(); + _workerThread.join(); + ioService.reset(); +} diff --git a/filedistribution/src/vespa/filedistribution/distributor/scheduler.h b/filedistribution/src/vespa/filedistribution/distributor/scheduler.h new file mode 100644 index 00000000000..75d4fa2d57b --- /dev/null +++ b/filedistribution/src/vespa/filedistribution/distributor/scheduler.h @@ -0,0 +1,45 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <boost/asio/io_service.hpp> +#include <boost/asio/deadline_timer.hpp> +#include <boost/enable_shared_from_this.hpp> +#include <boost/thread.hpp> + + +namespace filedistribution { + +class Scheduler : boost::noncopyable { +public: + class Task : public boost::enable_shared_from_this<Task> { + boost::asio::deadline_timer _timer; + public: + typedef boost::shared_ptr<Task> SP; + + Task(Scheduler& scheduler); + + virtual ~Task() {} + + void schedule(boost::asio::deadline_timer::duration_type delay); + void scheduleNow(); + + void handle(const boost::system::error_code& code); + protected: + virtual void doHandle() = 0; + }; + +private: + boost::asio::io_service ioService; + + //keeps io_service.run() from exiting until it has been destructed, + //see http://www.boost.org/doc/libs/1_42_0/doc/html/boost_asio/reference/io_service.html + boost::asio::io_service::work _keepAliveWork; + boost::thread _workerThread; + +public: + Scheduler(boost::function<void (boost::asio::io_service&)> callRun) ; + ~Scheduler(); +}; + +} + diff --git a/filedistribution/src/vespa/filedistribution/distributor/signalhandling.cpp b/filedistribution/src/vespa/filedistribution/distributor/signalhandling.cpp new file mode 100644 index 00000000000..0465f8d7b29 --- /dev/null +++ b/filedistribution/src/vespa/filedistribution/distributor/signalhandling.cpp @@ -0,0 +1,40 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include "signalhandling.h" +#include <vespa/vespalib/util/signalhandler.h> + +#include <vespa/log/log.h> +LOG_SETUP(".signalhandling"); + +typedef vespalib::SignalHandler SIG; + +void +initSignals() { + SIG::PIPE.ignore(); + SIG::INT.hook(); + SIG::TERM.hook(); + SIG::USR1.hook(); +} + +bool +askedToShutDown() { + bool result = SIG::INT.check() || SIG::TERM.check(); + if (result) { + LOG(debug, "Asked to shut down."); + } + return result; +} + +bool +askedToReinitialize() { + bool result = SIG::USR1.check(); + if (result) { + LOG(debug, "Asked to reinitialize."); + } + return result; +} + +void +clearReinitializeFlag() { + SIG::USR1.clear(); +} diff --git a/filedistribution/src/vespa/filedistribution/distributor/signalhandling.h b/filedistribution/src/vespa/filedistribution/distributor/signalhandling.h new file mode 100644 index 00000000000..d140e101169 --- /dev/null +++ b/filedistribution/src/vespa/filedistribution/distributor/signalhandling.h @@ -0,0 +1,15 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +void +initSignals(); + +bool +askedToShutDown(); + +bool +askedToReinitialize(); + +void +clearReinitializeFlag(); + diff --git a/filedistribution/src/vespa/filedistribution/distributor/state_server_impl.cpp b/filedistribution/src/vespa/filedistribution/distributor/state_server_impl.cpp new file mode 100644 index 00000000000..95a0a1a64ca --- /dev/null +++ b/filedistribution/src/vespa/filedistribution/distributor/state_server_impl.cpp @@ -0,0 +1,8 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include "state_server_impl.h" + +namespace filedistribution { + +} // namespace filedistribution diff --git a/filedistribution/src/vespa/filedistribution/distributor/state_server_impl.h b/filedistribution/src/vespa/filedistribution/distributor/state_server_impl.h new file mode 100644 index 00000000000..f52da27e597 --- /dev/null +++ b/filedistribution/src/vespa/filedistribution/distributor/state_server_impl.h @@ -0,0 +1,21 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/vespalib/net/state_server.h> +#include <vespa/vespalib/net/simple_metrics_producer.h> +#include <vespa/vespalib/net/simple_health_producer.h> +#include <vespa/vespalib/net/simple_component_config_producer.h> + +namespace filedistribution { + +struct StateServerImpl { + vespalib::SimpleHealthProducer myHealth; + vespalib::SimpleMetricsProducer myMetrics; + vespalib::SimpleComponentConfigProducer myComponents; + vespalib::StateServer myStateServer; + + StateServerImpl(int port) : myStateServer(port, myHealth, myMetrics, myComponents) {} +}; + +} // namespace filedistribution diff --git a/filedistribution/src/vespa/filedistribution/manager/.gitignore b/filedistribution/src/vespa/filedistribution/manager/.gitignore new file mode 100644 index 00000000000..b94a4aae8e0 --- /dev/null +++ b/filedistribution/src/vespa/filedistribution/manager/.gitignore @@ -0,0 +1,5 @@ +.depend +Makefile +com_yahoo_vespa_filedistribution_*.h +*.So +/libfiledistributionmanager.so.5.1 diff --git a/filedistribution/src/vespa/filedistribution/manager/CMakeLists.txt b/filedistribution/src/vespa/filedistribution/manager/CMakeLists.txt new file mode 100644 index 00000000000..0643983bb9b --- /dev/null +++ b/filedistribution/src/vespa/filedistribution/manager/CMakeLists.txt @@ -0,0 +1,27 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_library(filedistribution_filedistributionmanager + SOURCES + ${CMAKE_CURRENT_BINARY_DIR}/com_yahoo_vespa_filedistribution_FileDistributionManager.h + createtorrent.cpp + filedb.cpp + filedistributionmanager.cpp + stderr_logfwd.cpp + $<TARGET_OBJECTS:filedistribution_filedbmodel> + $<TARGET_OBJECTS:filedistribution_exceptionrethrower> + INSTALL lib64 + OUTPUT_NAME filedistributionmanager + DEPENDS + boost_system-mt-d + boost_thread-mt-d + boost_filesystem-mt-d + zookeeper_mt + ${JAVA_JVM_LIBRARY} +) +target_include_directories(filedistribution_filedistributionmanager PUBLIC SYSTEM ${JNI_INCLUDE_DIRS}) +add_custom_command( + OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/com_yahoo_vespa_filedistribution_FileDistributionManager.h + COMMAND javah -classpath ${PROJECT_SOURCE_DIR}/filedistributionmanager/target/filedistributionmanager.jar com.yahoo.vespa.filedistribution.FileDistributionManager + MAIN_DEPENDENCY ${PROJECT_SOURCE_DIR}/filedistributionmanager/target/filedistributionmanager.jar + WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR} +) + diff --git a/filedistribution/src/vespa/filedistribution/manager/createtorrent.cpp b/filedistribution/src/vespa/filedistribution/manager/createtorrent.cpp new file mode 100644 index 00000000000..001edd0e20a --- /dev/null +++ b/filedistribution/src/vespa/filedistribution/manager/createtorrent.cpp @@ -0,0 +1,90 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include "createtorrent.h" + +#include <iostream> +#include <fstream> +#include <cmath> +#include <iterator> +#include <sstream> +#include <string> + +#include <boost/filesystem/convenience.hpp> +#include <boost/lambda/lambda.hpp> + +#include "libtorrent/torrent_info.hpp" + +namespace fs = boost::filesystem; + +namespace { + +const libtorrent::size_type targetTorrentSize = 64 * 1024; + +void +aggregateFilenames(std::vector<std::string>& accumulator, const fs::path& path, std::string currPrefix) +{ + if (fs::is_directory(path)) { + for (fs::directory_iterator i(path), end; + i != end; + ++i) { + std::string newPrefix = currPrefix + "/" + std::string(i->path().filename().c_str()); + accumulator.push_back(newPrefix); + aggregateFilenames(accumulator, *i, newPrefix); + } + } +} + + +libtorrent::entry +createEntry(const fs::path& path) { + if (!fs::exists(path)) + throw std::runtime_error("Path '" + std::string(path.filename().c_str()) + " does not exists"); + + libtorrent::file_storage fileStorage; + libtorrent::add_files(fileStorage, path.string()); + + libtorrent::create_torrent torrent(fileStorage); + torrent.set_creator("vespa-filedistributor"); + torrent.set_priv(true); + torrent.add_tracker(""); + + libtorrent::set_piece_hashes(torrent, path.branch_path().string()); + return torrent.generate(); +} + +std::string +fileReferenceToString(const libtorrent::sha1_hash& fileReference) { + std::ostringstream fileReferenceString; + fileReferenceString <<fileReference; + return fileReferenceString.str(); +} + +} //anonymous namespace + +filedistribution:: +CreateTorrent:: +CreateTorrent(const boost::filesystem::path& path) + :_path(path), + _entry(createEntry(_path)) +{} + +const filedistribution::Move<filedistribution::Buffer> +filedistribution:: +CreateTorrent:: +bencode() const +{ + Buffer buffer(static_cast<int>(targetTorrentSize)); + libtorrent::bencode(std::back_inserter(buffer), _entry); + return move(buffer); +} + +const std::string +filedistribution:: +CreateTorrent:: +fileReference() const +{ +#pragma GCC diagnostic ignored "-Wdeprecated-declarations" + libtorrent::torrent_info info(_entry); +#pragma GCC diagnostic pop + return fileReferenceToString(info.info_hash()); +} diff --git a/filedistribution/src/vespa/filedistribution/manager/createtorrent.h b/filedistribution/src/vespa/filedistribution/manager/createtorrent.h new file mode 100644 index 00000000000..93d56fa9e6f --- /dev/null +++ b/filedistribution/src/vespa/filedistribution/manager/createtorrent.h @@ -0,0 +1,23 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vector> +#include <boost/filesystem/path.hpp> +#include <libtorrent/create_torrent.hpp> + +#include <vespa/filedistribution/common/buffer.h> + +namespace filedistribution { + +class CreateTorrent { + boost::filesystem::path _path; + libtorrent::entry _entry; +public: + + CreateTorrent(const boost::filesystem::path& path); + const Move<Buffer> bencode() const; + const std::string fileReference() const; +}; + +} //namespace filedistribution + diff --git a/filedistribution/src/vespa/filedistribution/manager/field.h b/filedistribution/src/vespa/filedistribution/manager/field.h new file mode 100644 index 00000000000..6077ad7a341 --- /dev/null +++ b/filedistribution/src/vespa/filedistribution/manager/field.h @@ -0,0 +1,46 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <jni.h> + +#include <vespa/filedistribution/common/exception.h> + +namespace filedistribution { + +struct BadFieldException : std::runtime_error { + explicit BadFieldException(const std::string& name) + : runtime_error("Could not lookup field '" + name + "'") + {} + + BadFieldException(const BadFieldException& e) + : runtime_error(e) + {} +}; + +template <class T> +class LongField { + jfieldID _fieldID; +public: + LongField() + {} + + LongField(jclass clazz, const char* fieldName, JNIEnv* env) + :_fieldID(env->GetFieldID(clazz, fieldName, "J")) { + + if (!_fieldID) + BOOST_THROW_EXCEPTION(BadFieldException(fieldName)); + } + + void set(jobject obj, T value, JNIEnv* env) { + jlong longValue = reinterpret_cast<long>(value); + env->SetLongField(obj, _fieldID, longValue); + } + + T get(jobject obj, JNIEnv* env) { + jlong longValue = env->GetLongField(obj, _fieldID); + return reinterpret_cast<T>(longValue); + } +}; + +} //namespace filedistribution + diff --git a/filedistribution/src/vespa/filedistribution/manager/filedb.cpp b/filedistribution/src/vespa/filedistribution/manager/filedb.cpp new file mode 100644 index 00000000000..9b0a665342a --- /dev/null +++ b/filedistribution/src/vespa/filedistribution/manager/filedb.cpp @@ -0,0 +1,57 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include "filedb.h" + +#include <boost/filesystem.hpp> + +namespace fs = boost::filesystem; + +namespace { + +void copyDirectory(fs::path original, fs::path destination) +{ + fs::create_directory(destination); + for (fs::directory_iterator curr(original), end; + curr != end; + ++curr) { + + fs::path destPath = destination / curr->path().filename(); + if ( fs::is_directory(curr->status()) ) { + copyDirectory(*curr, destPath); + } else { + fs::copy_file(*curr, destPath); + } + } +} + +} //anonymous namespace + +filedistribution:: +FileDB:: +FileDB(fs::path dbPath) + :_dbPath(dbPath) +{} + + +void +filedistribution:: +FileDB:: +add(fs::path original, const std::string& name) +{ + fs::path targetPathTemp = _dbPath / (name + ".tmp"); + if ( fs::exists(targetPathTemp) ) + fs::remove_all(targetPathTemp); + + + fs::create_directory(targetPathTemp); + if ( !fs::is_directory(original) ) { + fs::copy_file(original, targetPathTemp / original.filename()); + } else { + copyDirectory(original, targetPathTemp / original.filename()); + } + + fs::path targetPath = _dbPath / (name + ".new"); + if ( fs::exists(targetPath) ) + fs::remove_all(targetPath); + fs::rename(targetPathTemp, targetPath); +} diff --git a/filedistribution/src/vespa/filedistribution/manager/filedb.h b/filedistribution/src/vespa/filedistribution/manager/filedb.h new file mode 100644 index 00000000000..d44436f9e7f --- /dev/null +++ b/filedistribution/src/vespa/filedistribution/manager/filedb.h @@ -0,0 +1,18 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <string> +#include <boost/filesystem/path.hpp> + +namespace filedistribution { + +class FileDB { + boost::filesystem::path _dbPath; + int _fd; +public: + FileDB(boost::filesystem::path dbPath); + void add(boost::filesystem::path original, const std::string& name); +}; + +} //namespace filedistribution + diff --git a/filedistribution/src/vespa/filedistribution/manager/filedistributionmanager.cpp b/filedistribution/src/vespa/filedistribution/manager/filedistributionmanager.cpp new file mode 100644 index 00000000000..fde4302eee8 --- /dev/null +++ b/filedistribution/src/vespa/filedistribution/manager/filedistributionmanager.cpp @@ -0,0 +1,236 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/filedistribution/manager/com_yahoo_vespa_filedistribution_FileDistributionManager.h> + +#include <memory> +#include <boost/lambda/lambda.hpp> + +#include <vespa/filedistribution/model/filedistributionmodel.h> +#include <vespa/filedistribution/model/zkfiledbmodel.h> +#include <vespa/filedistribution/model/mockfiledistributionmodel.h> +#include <vespa/filedistribution/model/zkfacade.h> +#include "jnistring.h" +#include "field.h" +#include "createtorrent.h" +#include "filedb.h" + +using namespace filedistribution; + +namespace fs = boost::filesystem; + +namespace { + +class NativeFileDistributionManager { + public: + std::unique_ptr<FileDBModel> _fileDBModel; + std::unique_ptr<FileDB> _fileDB; +}; + +LongField<NativeFileDistributionManager*> nativeFileDistributionManagerField; + +void throwRuntimeException(const char* msg, JNIEnv* env) +{ + //do not mask active exception. + if (!env->ExceptionOccurred()) { + jclass runtimeExceptionClass = env->FindClass("java/lang/RuntimeException"); + if (runtimeExceptionClass) { + env->ThrowNew(runtimeExceptionClass, msg); + } + } +} + +template <class FIELD> +void deleteField(FIELD& field, jobject self, JNIEnv* env) +{ + delete field.get(self, env); + field.set(self, 0, env); +} + +} //anonymous namespace + + +#define STANDARDCATCH(returnStatement) \ + catch (const std::bad_alloc&) { \ + std::cerr<<"Error: Out of memory" <<std::endl; \ + /*might fail, therefore also uses stderror message*/ \ + throwRuntimeException("Out of memory", env); \ + returnStatement; \ + } catch(const filedistribution::ZKException& e) { \ + std::stringstream ss; \ + ss << "In" << __FUNCTION__ << ": "; \ + ss << diagnosticUserLevelMessage(e); \ + throwRuntimeException(ss.str().c_str(), env); \ + returnStatement; \ + } catch(const std::exception& e) { \ + throwRuntimeException(e.what(), env); \ + returnStatement; \ + } + +JNIEXPORT +void JNICALL +Java_com_yahoo_vespa_filedistribution_FileDistributionManager_setup( + JNIEnv *env, jclass self) +{ + try { + filedistribution::setupZooKeeperLogging(); + nativeFileDistributionManagerField = LongField<NativeFileDistributionManager*>(self, "nativeFileDistributionManager", env); + } STANDARDCATCH() +} + + +namespace { +void initMockFileDBModel(NativeFileDistributionManager& manager) +{ + manager._fileDBModel.reset(new MockFileDBModel()); +} + +void initFileDBModel(NativeFileDistributionManager& manager, const std::string& zkServers) +{ + //Ignored for now, since we're not installing any watchers. + boost::shared_ptr<ExceptionRethrower> ignoredRethrower(new ExceptionRethrower()); + boost::shared_ptr<ZKFacade> zk(new ZKFacade(zkServers, ignoredRethrower)); + manager._fileDBModel.reset(new ZKFileDBModel(zk)); +} +} //end anonymous namespace + +JNIEXPORT +void JNICALL +Java_com_yahoo_vespa_filedistribution_FileDistributionManager_init( + JNIEnv *env, jobject self, + jbyteArray fileDBPathArg, jbyteArray zkServersArg) +{ + try { + JNIString zkServers(zkServersArg, env); + + nativeFileDistributionManagerField.set(self, new NativeFileDistributionManager(), env); + NativeFileDistributionManager& manager = *nativeFileDistributionManagerField.get(self, env); + if (zkServers._value == "mockfiledistributionmodel.testing") { + initMockFileDBModel(manager); + } else { + initFileDBModel(manager, zkServers._value); + } + + JNIString fileDBPath(fileDBPathArg, env); + manager._fileDB.reset(new FileDB(fileDBPath._value)); + + } STANDARDCATCH() +} + + +JNIEXPORT +jstring JNICALL +Java_com_yahoo_vespa_filedistribution_FileDistributionManager_addFileImpl( + JNIEnv *env, jobject self, + jbyteArray completePathArg) +{ + try { + JNIString completePath(completePathArg, env); + CreateTorrent createTorrent(completePath._value); + + std::string fileReference = createTorrent.fileReference(); + NativeFileDistributionManager& manager = *nativeFileDistributionManagerField.get(self, env); + + manager._fileDB->add(completePath._value, fileReference); + + FileDBModel& model = *manager._fileDBModel; + if (! model.hasFile(fileReference) ) { + model.addFile(fileReference, createTorrent.bencode()); + } + + //contains string with the characters 0-9 a-f + return env->NewStringUTF(fileReference.c_str()); + } STANDARDCATCH(return 0) +} + + +JNIEXPORT +void JNICALL +Java_com_yahoo_vespa_filedistribution_FileDistributionManager_shutdown( + JNIEnv *env, jobject self) +{ + deleteField(nativeFileDistributionManagerField, self, env); +} + + +JNIEXPORT +void JNICALL +Java_com_yahoo_vespa_filedistribution_FileDistributionManager_setDeployedFilesImpl( + JNIEnv *env, jobject self, jbyteArray hostNameArg, + jbyteArray appIdArg, jobjectArray fileReferencesArg) +{ + try { + JNIString hostName(hostNameArg, env); + JNIString appId(appIdArg, env); + JNIArray<JNIString> fileReferences(fileReferencesArg, env); + + nativeFileDistributionManagerField.get(self, env)->_fileDBModel-> + setDeployedFilesToDownload(hostName._value, appId._value, fileReferences._value); + } STANDARDCATCH() +} + + +JNIEXPORT +void JNICALL +Java_com_yahoo_vespa_filedistribution_FileDistributionManager_limitSendingOfDeployedFilesToImpl( + JNIEnv *env, jobject self, jobjectArray hostNamesArg, jbyteArray appIdArg) +{ + try { + JNIArray<JNIString> hostNames(hostNamesArg, env); + JNIString appId(appIdArg, env); + + nativeFileDistributionManagerField.get(self, env)->_fileDBModel-> + cleanDeployedFilesToDownload(hostNames._value, appId._value); + } STANDARDCATCH() +} + +JNIEXPORT +void JNICALL +Java_com_yahoo_vespa_filedistribution_FileDistributionManager_removeDeploymentsThatHaveDifferentApplicationIdImpl( + JNIEnv *env, jobject self, jobjectArray hostNamesArg, jbyteArray appIdArg) +{ + try { + JNIArray<JNIString> hostNames(hostNamesArg, env); + JNIString appId(appIdArg, env); + + nativeFileDistributionManagerField.get(self, env)->_fileDBModel-> + removeDeploymentsThatHaveDifferentApplicationId(hostNames._value, appId._value); + } STANDARDCATCH() +} + + + +JNIEXPORT +void JNICALL +Java_com_yahoo_vespa_filedistribution_FileDistributionManager_limitFilesTo( + JNIEnv *env, jobject self, jobjectArray fileReferencesArg) +{ + try { + JNIArray<JNIString> fileReferences(fileReferencesArg, env); + + nativeFileDistributionManagerField.get(self, env)->_fileDBModel-> + cleanFiles(fileReferences._value); + } STANDARDCATCH() +} + + +JNIEXPORT +jbyteArray JNICALL +Java_com_yahoo_vespa_filedistribution_FileDistributionManager_getProgressImpl( + JNIEnv *env, jobject self, jbyteArray fileReferenceArg, jobjectArray hostNamesArg) +{ + try { + JNIString fileReference(fileReferenceArg, env); + JNIArray<JNIString> hostNames(hostNamesArg, env); + + const filedistribution::FileDBModel::Progress progress = + nativeFileDistributionManagerField.get(self, env)->_fileDBModel-> + getProgress(fileReference._value, hostNames._value); + + jbyteArray result = env->NewByteArray(progress.size()); + if (!result) + return 0; //exception thrown when returning + + env->SetByteArrayRegion(result, 0, progress.size(), &*progress.begin()); + return result; + } STANDARDCATCH(return 0) +} diff --git a/filedistribution/src/vespa/filedistribution/manager/jnistring.h b/filedistribution/src/vespa/filedistribution/manager/jnistring.h new file mode 100644 index 00000000000..aa5d459232e --- /dev/null +++ b/filedistribution/src/vespa/filedistribution/manager/jnistring.h @@ -0,0 +1,78 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <exception> +#include <string> + +#include <jni.h> + +namespace filedistribution { + +class JNIString { + struct Repr { + JNIEnv* _env; + jbyteArray _str; + jint _size; + char* _repr; + + Repr(jbyteArray str, JNIEnv* env) + :_env(env), + _str(str), + _size(_env->GetArrayLength(str)), + _repr(reinterpret_cast<char*>(_env->GetPrimitiveArrayCritical(str, 0))) { + + if (!_repr) + throw std::bad_alloc(); + } + + ~Repr() { + _env->ReleasePrimitiveArrayCritical(_str, _repr, 0); + _env->DeleteLocalRef(_str); + } + }; +public: + typedef jbyteArray JavaValue; + typedef std::string Value; + Value _value; + + JNIString(jbyteArray str, JNIEnv* env) { + Repr repr(str, env); + Value result(repr._repr, repr._repr + repr._size); + _value.swap(result); + } +}; + +class JNIUtf8String { +public: + typedef jstring JavaValue; + typedef std::string Value; + std::string _value; + + JNIUtf8String(jstring str, JNIEnv* env) + :_value(env->GetStringUTFLength(str), 0) + { + env->GetStringUTFRegion(str, 0, _value.begin() - _value.end(), &*_value.begin()); + env->DeleteLocalRef(str); + } +}; + +template <class JNITYPE> +class JNIArray { +public: + std::vector<typename JNITYPE::Value> _value; + + JNIArray(jobjectArray array, JNIEnv* env) { + jsize length = env->GetArrayLength(array); + _value.reserve(length); + + for (jsize i=0; i<length; ++i) { + jobject element = env->GetObjectArrayElement(array, i); + JNITYPE elementValue(static_cast<typename JNITYPE::JavaValue>(element), env); + _value.push_back(elementValue._value); + } + env->DeleteLocalRef(array); + } +}; + +} //namespace filedistribution + diff --git a/filedistribution/src/vespa/filedistribution/manager/stderr_logfwd.cpp b/filedistribution/src/vespa/filedistribution/manager/stderr_logfwd.cpp new file mode 100644 index 00000000000..686dffa5c00 --- /dev/null +++ b/filedistribution/src/vespa/filedistribution/manager/stderr_logfwd.cpp @@ -0,0 +1,26 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/filedistribution/common/logfwd.h> + +#include <stdarg.h> +#include <iostream> +#include <boost/scoped_array.hpp> +#include <stdio.h> + + + +void filedistribution::logfwd::log(LogLevel level, const char* file, int line, const char* fmt, ...) +{ + if (level == debug || level == info) + return; + + const size_t maxSize(0x8000); + boost::scoped_array<char> payload(new char[maxSize]); + + va_list args; + va_start(args, fmt); + vsnprintf(payload.get(), maxSize, fmt, args); + va_end(args); + + std::cerr <<"Error: " <<payload.get() <<" File: " <<file <<" Line: " <<line <<std::endl; +} diff --git a/filedistribution/src/vespa/filedistribution/model/CMakeLists.txt b/filedistribution/src/vespa/filedistribution/model/CMakeLists.txt new file mode 100644 index 00000000000..58031d1a7f4 --- /dev/null +++ b/filedistribution/src/vespa/filedistribution/model/CMakeLists.txt @@ -0,0 +1,20 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_library(filedistribution_filedbmodel OBJECT + SOURCES + zkfiledbmodel.cpp + zkfacade.cpp + deployedfilestodownload.cpp + DEPENDS +) +vespa_add_library(filedistribution_filedistributionmodel STATIC + SOURCES + deployedfilestodownload.cpp + filedistributionmodelimpl.cpp + zkfacade.cpp + zkfiledbmodel.cpp + DEPENDS +) +vespa_generate_config(filedistribution_filedistributionmodel filereferences.def) + +vespa_add_target_external_dependency(filedistribution_filedistributionmodel zookeeper_mt) +install(FILES filereferences.def DESTINATION var/db/vespa/config_server/serverdb/classes) diff --git a/filedistribution/src/vespa/filedistribution/model/deployedfilestodownload.cpp b/filedistribution/src/vespa/filedistribution/model/deployedfilestodownload.cpp new file mode 100644 index 00000000000..733d60d91bb --- /dev/null +++ b/filedistribution/src/vespa/filedistribution/model/deployedfilestodownload.cpp @@ -0,0 +1,158 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include "deployedfilestodownload.h" + +#include <sstream> +#include <iterator> + +#include <boost/lambda/lambda.hpp> +#include <boost/lambda/bind.hpp> + +#include <vespa/filedistribution/common/logfwd.h> + +using filedistribution::DeployedFilesToDownload; + +typedef std::vector<std::string> StringVector; +typedef boost::filesystem::path Path; + +namespace filedistribution { + +const Path getApplicationIdPath(const Path & parent) { return parent / "appId"; } + +const std::string +readApplicationId(filedistribution::ZKFacade & zk, const Path & deployNode) +{ + if (zk.hasNode(getApplicationIdPath(deployNode))) { + return zk.getString(getApplicationIdPath(deployNode)); + } + return "default:default:default"; +} + +} + +const DeployedFilesToDownload::Path +DeployedFilesToDownload::addNewDeployNode(Path parentPath, const FileReferences& files) { + Path path = parentPath / "deploy_"; + + std::ostringstream filesStream; + if (!files.empty()) { + filesStream << files[0]; + std::for_each(files.begin() +1, files.end(), + filesStream <<boost::lambda::constant('\n') <<boost::lambda::_1); + } + Path retPath = _zk.createSequenceNode(path, filesStream.str().c_str(), filesStream.str().length()); + return retPath; +} + +void +DeployedFilesToDownload::deleteExpiredDeployNodes(Path parentPath) { + std::map<std::string, StringVector> childrenPerId(groupChildrenByAppId(parentPath, _zk.getChildren(parentPath))); + for (auto & kv : childrenPerId) { + deleteExpiredDeployNodes(parentPath, kv.second); + } +} + +std::map<std::string, StringVector> +DeployedFilesToDownload::groupChildrenByAppId(const Path & parentPath, const StringVector & children) +{ + std::map<std::string, StringVector> childrenById; + std::for_each(std::begin(children), std::end(children), + [&](const std::string & childName) + { + Path childPath = parentPath / childName; + std::string appId(readApplicationId(_zk, childPath)); + childrenById[appId].push_back(childName); + }); + return childrenById; +} + +void +DeployedFilesToDownload::deleteExpiredDeployNodes(Path parentPath, StringVector children) +{ + if (children.size() > numberOfDeploysToKeepFiles) { + std::sort(children.begin(), children.end()); + + size_t numberOfNodesToDelete = children.size() - numberOfDeploysToKeepFiles; + std::for_each(children.begin(), children.begin() + numberOfNodesToDelete, + boost::lambda::bind(&ZKFacade::remove, &_zk, + boost::lambda::ret<Path>(parentPath / boost::lambda::_1))); + } +} + +void +DeployedFilesToDownload::addAppIdToDeployNode(const Path & deployNode, const std::string & appId) +{ + _zk.setData(getApplicationIdPath(deployNode), appId.c_str(), appId.length()); +} + +void +DeployedFilesToDownload::setDeployedFilesToDownload( + const std::string& hostName, + const std::string& applicationId, + const FileReferences& files) { + Path parentPath = getPath(hostName); + _zk.setData(parentPath, "", 0); + + const Path deployNode(addNewDeployNode(parentPath, files)); + addAppIdToDeployNode(deployNode, applicationId); + deleteExpiredDeployNodes(parentPath); +} + +//Nothrow +template <typename INSERT_ITERATOR> +void +DeployedFilesToDownload::readDeployFile(const Path& path, INSERT_ITERATOR insertionIterator) { + LOGFWD(debug, "Reading deploy file '%s", path.string().c_str()); + + + try { + Buffer buffer(_zk.getData(path)); + std::string stringBuffer(buffer.begin(), buffer.end()); + std::istringstream stream(stringBuffer); + + typedef std::istream_iterator<std::string> iterator; + std::copy(iterator(stream), iterator(), insertionIterator); + } catch (const ZKNodeDoesNotExistsException& e) { + //Node deleted, no problem. + LOGFWD(debug, "Deploy file '%s' deleted.", path.string().c_str()); + } +} + +const DeployedFilesToDownload::FileReferences +DeployedFilesToDownload::getDeployedFilesToDownload( + const std::string& hostName, + const ZKFacade::NodeChangedWatcherSP& watcher) { + + try { + StringVector deployedFiles = _zk.getChildren(getPath(hostName), watcher); + FileReferences fileReferences; + + for (StringVector::iterator i = deployedFiles.begin(); i != deployedFiles.end(); ++i) { + readDeployFile(getPath(hostName) / *i, std::back_inserter(fileReferences)); + } + + return fileReferences; + } catch (const ZKNodeDoesNotExistsException&) { + //Add watch waiting for the node to appear: + if (_zk.hasNode(getPath(hostName), watcher)) { + return getDeployedFilesToDownload(hostName, watcher); + } else { + return FileReferences(); + } + } +} + +const DeployedFilesToDownload::FileReferences +DeployedFilesToDownload::getLatestDeployedFilesToDownload(const std::string& hostName) +{ + StringVector deployedFiles = _zk.getChildren(getPath(hostName)); + std::sort(deployedFiles.begin(), deployedFiles.end()); + + FileReferences fileReferences; + if (deployedFiles.empty()) { + return fileReferences; + } else { + readDeployFile(getPath(hostName) / *deployedFiles.rbegin(), std::back_inserter(fileReferences)); + return fileReferences; + } +} diff --git a/filedistribution/src/vespa/filedistribution/model/deployedfilestodownload.h b/filedistribution/src/vespa/filedistribution/model/deployedfilestodownload.h new file mode 100644 index 00000000000..aeed0922fc8 --- /dev/null +++ b/filedistribution/src/vespa/filedistribution/model/deployedfilestodownload.h @@ -0,0 +1,56 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <boost/filesystem/path.hpp> +#include "zkfacade.h" +#include "zkfiledbmodel.h" + +namespace filedistribution { + +const std::string readApplicationId(ZKFacade & zk, const boost::filesystem::path & deployNode); + +class DeployedFilesToDownload { + //includes the current deploy run; + static const size_t numberOfDeploysToKeepFiles = 2; + typedef boost::filesystem::path Path; + + ZKFacade& _zk; + + Path getPath(const std::string& hostName) { + return ZKFileDBModel::_hostsPath / hostName; + } + + //Nothrow + template <typename INSERT_ITERATOR> + void readDeployFile(const boost::filesystem::path& path, INSERT_ITERATOR insertionIterator); + void addAppIdToDeployNode(const Path & deployNode, const std::string & appId); + std::map<std::string, std::vector<std::string> > groupChildrenByAppId(const Path & parentPath, const std::vector<std::string> & children); + void deleteExpiredDeployNodes(Path parentPath, std::vector<std::string> children); + +public: + typedef std::vector<std::string> FileReferences; + + DeployedFilesToDownload(ZKFacade* zk) + :_zk(*zk) + {} + + const Path addNewDeployNode(Path parentPath, const FileReferences& files); + + void deleteExpiredDeployNodes(Path parentPath); + + void setDeployedFilesToDownload( + const std::string& hostName, + const std::string& applicationId, + const FileReferences& files); + + /** For all the deploys available **/ + const FileReferences getDeployedFilesToDownload( + const std::string& hostName, + const ZKFacade::NodeChangedWatcherSP& watcher); + + /** For the current deploy only **/ + const FileReferences getLatestDeployedFilesToDownload(const std::string& hostName); +}; + +} //namespace filedistribution + diff --git a/filedistribution/src/vespa/filedistribution/model/filedbmodel.h b/filedistribution/src/vespa/filedistribution/model/filedbmodel.h new file mode 100644 index 00000000000..1c8ad181306 --- /dev/null +++ b/filedistribution/src/vespa/filedistribution/model/filedbmodel.h @@ -0,0 +1,56 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <string> +#include <vector> +#include <vespa/filedistribution/common/buffer.h> +#include <vespa/filedistribution/common/exception.h> + +namespace filedistribution { + +struct InvalidProgressException : public Exception { + const char* what() const throw() { + return "Invalid progress information reported by one of the filedistributors"; + } +}; + +struct FileDoesNotExistException : public Exception {}; + +class FileDBModel : boost::noncopyable { +public: + class InvalidHostStatusException : public Exception {}; + struct HostStatus { + enum State { finished, inProgress, notStarted }; + + State _state; + size_t _numFilesToDownload; + size_t _numFilesFinished; + }; + + virtual ~FileDBModel(); + + virtual bool hasFile(const std::string& fileReference) = 0; + virtual void addFile(const std::string& fileReference, const Buffer& buffer) = 0; + virtual Move<Buffer> getFile(const std::string& fileReference) = 0; + virtual void cleanFiles(const std::vector<std::string>& filesToPreserve) = 0; + + virtual void setDeployedFilesToDownload(const std::string& hostName, + const std::string & appId, + const std::vector<std::string> & files) = 0; + virtual void cleanDeployedFilesToDownload( + const std::vector<std::string> & hostsToPreserve, + const std::string& appId) = 0; + virtual void removeDeploymentsThatHaveDifferentApplicationId( + const std::vector<std::string> & hostsToPreserve, + const std::string& appId) = 0; + virtual std::vector<std::string> getHosts() = 0; + + virtual HostStatus getHostStatus(const std::string& hostName) = 0; + //TODO: does not really belong here, refactor. + typedef std::vector<int8_t> Progress; // [0-100] + virtual Progress getProgress(const std::string& fileReference, + const std::vector<std::string>& hostsSortedAscending) = 0; +}; + +} //namespace filedistribution + diff --git a/filedistribution/src/vespa/filedistribution/model/filedistributionmodel.h b/filedistribution/src/vespa/filedistribution/model/filedistributionmodel.h new file mode 100644 index 00000000000..516e183490e --- /dev/null +++ b/filedistribution/src/vespa/filedistribution/model/filedistributionmodel.h @@ -0,0 +1,43 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vector> +#include <memory> +#include <string> +#include <set> + +#include <boost/noncopyable.hpp> +#include <boost/filesystem/path.hpp> +#include <boost/signals2.hpp> + +#include <libtorrent/peer.hpp> +#include <vespa/filedistribution/common/buffer.h> +#include <vespa/filedistribution/common/exception.h> +#include "filedbmodel.h" + +namespace filedistribution { + +class FileDistributionModel : boost::noncopyable { +public: + class NotPeer : public Exception {}; + + typedef boost::signals2::signal<void ()> FilesToDownloadChangedSignal; + typedef std::vector<libtorrent::peer_entry> PeerEntries; + + virtual FileDBModel& getFileDBModel() = 0; + + virtual std::set<std::string> getFilesToDownload() = 0; + + virtual PeerEntries getPeers(const std::string& fileReference, size_t maxPeers) = 0; + virtual void addPeer(const std::string& fileReference) = 0; + virtual void removePeer(const std::string& fileReference) = 0; + virtual void peerFinished(const std::string& fileReference) = 0; //throws NotPeer + + virtual ~FileDistributionModel() {} + + FilesToDownloadChangedSignal _filesToDownloadChanged; +}; + +} //namespace filedistribution + + diff --git a/filedistribution/src/vespa/filedistribution/model/filedistributionmodelimpl.cpp b/filedistribution/src/vespa/filedistribution/model/filedistributionmodelimpl.cpp new file mode 100644 index 00000000000..5b9de93249a --- /dev/null +++ b/filedistribution/src/vespa/filedistribution/model/filedistributionmodelimpl.cpp @@ -0,0 +1,239 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include "filedistributionmodel.h" + +#include <vector> +#include <set> +#include <string> +#include <cstdlib> + +#include <boost/filesystem.hpp> +#include <boost/lambda/lambda.hpp> +#include <boost/lambda/bind.hpp> +#include <boost/thread/mutex.hpp> +#include <boost/thread/locks.hpp> +#include <zookeeper/zookeeper.h> + +#include <vespa/log/log.h> +LOG_SETUP(".filedistributionmodel"); + +#include "zkfiledbmodel.h" +#include "deployedfilestodownload.h" +#include "filedistributionmodelimpl.h" + +namespace fs = boost::filesystem; + +using filedistribution::ZKFileDBModel; + +namespace { +//peer format: hostName:port + + +void +addPeerEntry(const std::string& peer, + filedistribution::FileDistributionModelImpl::PeerEntries& result) { + + try { + libtorrent::peer_entry peerEntry; + peerEntry.pid.clear(); + + std::istringstream stream(peer); + stream.exceptions ( std::istream::failbit | std::istream::badbit ); + + std::getline(stream, peerEntry.ip, ZKFileDBModel::_peerEntrySeparator); + stream >> peerEntry.port; + + result.push_back(peerEntry); + } catch (const std::exception&) { + LOG(warning, "Invalid peer entry: '%s'", peer.c_str()); + //Ignore invalid peer entries + } +} + +std::vector<std::string>::iterator +prunePeers(std::vector<std::string> &peers, size_t maxPeers) { + if (peers.size() <= maxPeers) + return peers.end(); + + assert (maxPeers < 2147483648); //due to the usage of rand() + + const size_t peersSize = peers.size(); + for (size_t i=0; i<maxPeers; ++i) { + //i <= i + (std::rand() % ( peersSize -i )) <= peerSize - 1 + size_t candidateBetween_i_and_peerSize = i + ( std::rand() % ( peersSize -i )); + std::swap(peers[i], peers[candidateBetween_i_and_peerSize]); + } + + return peers.begin() + maxPeers; +} + +} //anonymous namespace + +using filedistribution::FileDistributionModelImpl; + +struct FileDistributionModelImpl::DeployedFilesChangedCallback : + public ZKFacade::NodeChangedWatcher +{ + typedef boost::shared_ptr<DeployedFilesChangedCallback> SP; + + boost::weak_ptr<FileDistributionModelImpl> _parent; + + DeployedFilesChangedCallback( + const boost::shared_ptr<FileDistributionModelImpl> & parent) + :_parent(parent) + {} + + //override + void operator()() { + if (boost::shared_ptr<FileDistributionModelImpl> model = _parent.lock()) { + model->_filesToDownloadChanged(); + } + } +}; + + +FileDistributionModelImpl::~FileDistributionModelImpl() { + LOG(debug, "Deconstructing FileDistributionModelImpl"); +} + +FileDistributionModelImpl::PeerEntries +FileDistributionModelImpl::getPeers(const std::string& fileReference, size_t maxPeers) { + try { + fs::path path = _fileDBModel.getPeersPath(fileReference); + + typedef std::vector<std::string> Peers; + Peers peers = _zk->getChildren(path); + // TODO: Take this port from some config somewhere instead of hardcoding + addConfigServersAsPeers(peers, getenv("services__addr_configserver"), 19093); + + Peers::iterator end = prunePeers(peers, maxPeers); + + PeerEntries result; + result.reserve(end - peers.begin()); + + namespace ll=boost::lambda; + std::for_each(peers.begin(), end, + ll::bind(&addPeerEntry, boost::lambda::_1, boost::ref(result))); + + LOG(debug, "Found %zu peers for path '%s'", result.size(), path.string().c_str()); + return result; + } catch(ZKNodeDoesNotExistsException&) { + LOG(debug, ("No peer entries available for " + fileReference).c_str()); + return PeerEntries(); + } +} + +fs::path +FileDistributionModelImpl::getPeerEntryPath(const std::string& fileReference) { + std::ostringstream entry; + entry <<_hostName + <<ZKFileDBModel::_peerEntrySeparator <<_port; + + return _fileDBModel.getPeersPath(fileReference) / entry.str(); +} + +void +FileDistributionModelImpl::addPeer(const std::string& fileReference) { + fs::path path = getPeerEntryPath(fileReference); + LOG(debug, "Adding peer '%s'", path.string().c_str()); + + if (_zk->hasNode(path)) { + LOG(info, "Retiring previous peer node owner."); + _zk->removeIfExists(path); + } + _zk->addEphemeralNode(path); +} + +void +FileDistributionModelImpl::removePeer(const std::string& fileReference) { + fs::path path = getPeerEntryPath(fileReference); + LOG(debug, "Removing peer '%s'", path.string().c_str()); + + _zk->removeIfExists(path); +} + +//Assumes that addPeer has been called before the torrent was started, +//so that we avoid the race condition between finishing downloading a torrent +//and setting peer status +void +FileDistributionModelImpl::peerFinished(const std::string& fileReference) { + fs::path path = getPeerEntryPath(fileReference); + LOG(debug, "Peer finished '%s'", path.string().c_str()); + + try { + bool mustExist = true; + char progress = 100; //percent + + _zk->setData(path, &progress, sizeof(char), mustExist); + } catch(ZKNodeDoesNotExistsException&) { + BOOST_THROW_EXCEPTION(NotPeer()); + } +} + +std::set<std::string> +FileDistributionModelImpl::getFilesToDownload() { + DeployedFilesToDownload d(_zk.get()); + std::vector<std::string> deployed = d.getDeployedFilesToDownload(_hostName, + DeployedFilesChangedCallback::SP( + new DeployedFilesChangedCallback(shared_from_this()))); + + std::set<std::string> result(deployed.begin(), deployed.end()); + + { + LockGuard guard(_activeFileReferencesMutex); + result.insert(_activeFileReferences.begin(), _activeFileReferences.end()); + } + return result; +} + +bool +FileDistributionModelImpl::updateActiveFileReferences( + const std::vector<vespalib::string>& fileReferences) { + + std::vector<vespalib::string> sortedFileReferences(fileReferences); + std::sort(sortedFileReferences.begin(), sortedFileReferences.end()); + + LockGuard guard(_activeFileReferencesMutex); + bool changed = + sortedFileReferences != _activeFileReferences; + + sortedFileReferences.swap(_activeFileReferences); + return changed; +} + +void +FileDistributionModelImpl::addConfigServersAsPeers( + std::vector<std::string> & peers, char const* envConfigServers, int port) { + + std::set<std::string> peersFromTracker(peers.begin(), peers.end()); + + if (envConfigServers == NULL) { + // Could be standalone cluster (not set for this). + return; + } + std::string configServerCommaListed(envConfigServers); + std::stringstream configserverstream(configServerCommaListed); + std::string configserver; + while (std::getline(configserverstream, configserver, ',')) { + configserver += ":" + std::to_string(port); + if (peersFromTracker.find(configserver) == peersFromTracker.end()) { + peers.push_back(configserver); + LOG(debug, "Adding configserver '%s'", configserver.c_str()); + } else { + LOG(debug, "Configserver already added '%s'", configserver.c_str()); + } + } +} + + + +void +FileDistributionModelImpl::configure(std::unique_ptr<FilereferencesConfig> config) { + try { + const bool changed = updateActiveFileReferences(config->filereferences); + if (changed) + _filesToDownloadChanged(); + } catch(...) { + _exceptionRethrower->store(boost::current_exception()); + } +} diff --git a/filedistribution/src/vespa/filedistribution/model/filedistributionmodelimpl.h b/filedistribution/src/vespa/filedistribution/model/filedistributionmodelimpl.h new file mode 100644 index 00000000000..04a111a00df --- /dev/null +++ b/filedistribution/src/vespa/filedistribution/model/filedistributionmodelimpl.h @@ -0,0 +1,76 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <boost/enable_shared_from_this.hpp> + +#include "filedistributionmodel.h" +#include <vespa/filedistribution/model/config-filereferences.h> +#include "zkfacade.h" +#include "zkfiledbmodel.h" +#include <vespa/filedistribution/common/exceptionrethrower.h> +#include <vespa/config/config.h> + +using cloud::config::filedistribution::FilereferencesConfig; + +namespace filedistribution { + +class FileDistributionModelImpl : public FileDistributionModel, + public config::IFetcherCallback<FilereferencesConfig>, + public boost::enable_shared_from_this<FileDistributionModelImpl> +{ + struct DeployedFilesChangedCallback; + + const std::string _hostName; + const int _port; + + const boost::shared_ptr<ZKFacade> _zk; + ZKFileDBModel _fileDBModel; + + boost::mutex _activeFileReferencesMutex; + typedef boost::lock_guard<boost::mutex> LockGuard; + std::vector<vespalib::string> _activeFileReferences; + + const boost::shared_ptr<ExceptionRethrower> _exceptionRethrower; + + bool /*changed*/ + updateActiveFileReferences(const std::vector<vespalib::string>& fileReferences); + + ZKFacade::Path getPeerEntryPath(const std::string& fileReference); +public: + FileDistributionModelImpl(const std::string& hostName, int port, + const boost::shared_ptr<ZKFacade>& zk, + const boost::shared_ptr<ExceptionRethrower>& exceptionRethrower) + :_hostName(hostName), + _port(port), + _zk(zk), + _fileDBModel(_zk), + _exceptionRethrower(exceptionRethrower) + { + /* Hack: Force the first call to updateActiveFileReferences to return changed=true + when the file references config is empty. + This ensures that the "deployed files to download" nodes in zookeeper are read at start up. + */ + _activeFileReferences.push_back("force-initial-files-to-download-changed-signal"); + } + + ~FileDistributionModelImpl(); + + //overrides FileDistributionModel + FileDBModel& getFileDBModel() { + return _fileDBModel; + } + + std::set<std::string> getFilesToDownload(); + + PeerEntries getPeers(const std::string& fileReference, size_t maxPeers); + void addPeer(const std::string& fileReference); + void removePeer(const std::string& fileReference); + void peerFinished(const std::string& fileReference); + void addConfigServersAsPeers(std::vector<std::string>& peers, char const* envConfigServer, int port); + + //Overrides Subscriber + void configure(std::unique_ptr<FilereferencesConfig> config); +}; + +} //namespace filedistribution + diff --git a/filedistribution/src/vespa/filedistribution/model/filereferences.def b/filedistribution/src/vespa/filedistribution/model/filereferences.def new file mode 100644 index 00000000000..cd7212f0821 --- /dev/null +++ b/filedistribution/src/vespa/filedistribution/model/filereferences.def @@ -0,0 +1,3 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +namespace=cloud.config.filedistribution +filereferences[] string diff --git a/filedistribution/src/vespa/filedistribution/model/mockfiledistributionmodel.h b/filedistribution/src/vespa/filedistribution/model/mockfiledistributionmodel.h new file mode 100644 index 00000000000..169225deaf2 --- /dev/null +++ b/filedistribution/src/vespa/filedistribution/model/mockfiledistributionmodel.h @@ -0,0 +1,61 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "filedistributionmodel.h" + +#include <algorithm> +#include <vector> + +namespace filedistribution { + +class MockFileDBModel : public FileDBModel { + std::vector<std::string> _fileReferences; +public: +//Overrides + bool hasFile(const std::string& fileReference) { + return std::find(_fileReferences.begin(), _fileReferences.end(), fileReference) != _fileReferences.end(); + } + + void addFile(const std::string& fileReference, const Buffer & buffer) { + (void)buffer; + _fileReferences.push_back(fileReference); + } + + Move<Buffer> getFile(const std::string& fileReference) { + (void)fileReference; + const char* resultStr = "result"; + Buffer result(resultStr, resultStr + strlen(resultStr)); + return move(result); + } + + virtual void cleanFiles( + const std::vector<std::string> &) {} + + + virtual void setDeployedFilesToDownload(const std::string&, + const std::string&, + const std::vector<std::string> &) {} + virtual void cleanDeployedFilesToDownload( + const std::vector<std::string> &, + const std::string&) {} + virtual void removeDeploymentsThatHaveDifferentApplicationId( + const std::vector<std::string> &, + const std::string&) {} + + virtual std::vector<std::string> getHosts() { + return std::vector<std::string>(); + } + + virtual HostStatus getHostStatus(const std::string&) { + return HostStatus(); + } + + Progress getProgress(const std::string&, + const std::vector<std::string>&) { + return Progress(); + } +}; + + +} //namespace filedistribution + diff --git a/filedistribution/src/vespa/filedistribution/model/zkfacade.cpp b/filedistribution/src/vespa/filedistribution/model/zkfacade.cpp new file mode 100644 index 00000000000..71c93be1b98 --- /dev/null +++ b/filedistribution/src/vespa/filedistribution/model/zkfacade.cpp @@ -0,0 +1,648 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> + +#include "zkfacade.h" + +#include <iostream> +#include <unistd.h> +#include <signal.h> +#include <cassert> +#include <cstdio> +#include <sstream> +#include <boost/lambda/lambda.hpp> +#include <boost/lambda/bind.hpp> +#include <boost/throw_exception.hpp> +#include <boost/function_output_iterator.hpp> +#include <boost/thread.hpp> + +#include <zookeeper/zookeeper.h> +#include <vespa/filedistribution/common/logfwd.h> +#include <vespa/defaults.h> + +typedef boost::unique_lock<boost::mutex> UniqueLock; + +using filedistribution::ZKFacade; +using filedistribution::Move; +using filedistribution::Buffer; +using filedistribution::ZKGenericException; + +typedef ZKFacade::Path Path; + +namespace { + +class RetryController { + unsigned int _retryCount; + ZKFacade& _zkFacade; + + static const unsigned int _maxRetries = 10; +public: + int _lastStatus; + + RetryController(ZKFacade* zkFacade) + :_retryCount(0), + _zkFacade(*zkFacade), + _lastStatus(0) + {} + + void operator()(int status) { + _lastStatus = status; + } + + bool shouldRetry() { + ++_retryCount; + + return _zkFacade.retriesEnabled() && + _lastStatus != ZOK && + _retryCount < _maxRetries && + isNonLastingError(_lastStatus) && + pause(); + } + + bool isNonLastingError(int error) { + return error == ZCONNECTIONLOSS || + error == ZOPERATIONTIMEOUT; + } + + bool pause() { + unsigned int sleepInSeconds = 1; + sleep(sleepInSeconds); + LOGFWD(info, "Retrying zookeeper operation."); + return true; + } + + void throwIfError() { + namespace fd = filedistribution; + + switch (_lastStatus) { + case ZSESSIONEXPIRED: + BOOST_THROW_EXCEPTION(fd::ZKSessionExpired()); + case ZNONODE: + BOOST_THROW_EXCEPTION(fd::ZKNodeDoesNotExistsException()); + case ZNODEEXISTS: + BOOST_THROW_EXCEPTION(fd::ZKNodeExistsException()); + default: + if (_lastStatus != ZOK) + BOOST_THROW_EXCEPTION(fd::ZKGenericException(_lastStatus)); + } + } +}; + +class DeallocateZKStringVectorGuard { + String_vector& _strings; +public: + DeallocateZKStringVectorGuard(String_vector& strings) + :_strings(strings) + {} + + ~DeallocateZKStringVectorGuard() { + deallocate_String_vector(&_strings); + } +}; + +const Path +setDataForNewFile(ZKFacade& zk, const Path& path, const char* buffer, int length, zhandle_t* zhandle, int createFlags) { + + RetryController retryController(&zk); + const int maxPath = 1024; + char createdPath[maxPath]; + do { + retryController( + zoo_create(zhandle, path.string().c_str(), + buffer, length, + &ZOO_OPEN_ACL_UNSAFE, createFlags, + createdPath, maxPath)); + } while (retryController.shouldRetry()); + + retryController.throwIfError(); + return Path(createdPath); +} + +void +setDataForExistingFile(ZKFacade& zk, const Path& path, const char* buffer, int length, zhandle_t* zhandle) { + RetryController retryController(&zk); + + const int ignoreVersion = -1; + do { + retryController( + zoo_set(zhandle, path.string().c_str(), + buffer, length, ignoreVersion)); + } while (retryController.shouldRetry()); + + retryController.throwIfError(); +} + +} //anonymous namespace + +/********** Active watchers *******************************************/ +struct ZKFacade::ZKWatcher { + const boost::weak_ptr<ZKFacade> _owner; + const NodeChangedWatcherSP _nodeChangedWatcher; + + ZKWatcher( + const boost::shared_ptr<ZKFacade> &owner, + const NodeChangedWatcherSP& nodeChangedWatcher ) + :_owner(owner), + _nodeChangedWatcher(nodeChangedWatcher) + {} + + static void watcherFn(zhandle_t *zh, int type, + int state, const char *path,void *watcherContext) { + + (void)zh; + (void)state; + (void)path; + + if (type == ZOO_SESSION_EVENT) { + //The session events do not cause unregistering of the watcher + //inside zookeeper, so don't unregister it here. + LOGFWD(debug, "ZKWatcher recieved session event with state '%d'. Ignoring", state); + return; + } + + LOGFWD(debug, "ZKWatcher: Begin watcher called for path '%s' with type %d.", path, type); + + ZKWatcher* self = static_cast<ZKWatcher*>(watcherContext); + + //WARNING: Since we're creating a shared_ptr to ZKFacade here, this might cause + //destruction of the ZKFacade in a zookeeper thread. + //Since zookeeper_close blocks until all watcher threads are finished, and we're inside a watcher thread, + //this will cause infinite waiting. + //To avoid this, a custom shared_ptr deleter using a separate deleter thread must be used. + + if (boost::shared_ptr<ZKFacade> zk = self->_owner.lock()) { + zk->invokeWatcher(watcherContext); + } + + LOGFWD(debug, "ZKWatcher: End watcher called for path '%s' with type %d.", path, type); + } +}; + +void +ZKFacade::stateWatchingFun(zhandle_t*, int type, int state, const char* path, void* context) { + (void)path; + + //The ZKFacade won't expire before zookeeper_close has finished. + ZKFacade* self = (ZKFacade*)context; + if (type == ZOO_SESSION_EVENT) { + LOGFWD(debug, "Zookeeper session event: %d", state); + if (state == ZOO_EXPIRED_SESSION_STATE) { + self->_exceptionRethrower->store(ZKSessionExpired()); + } else if (state == ZOO_AUTH_FAILED_STATE) { + self->_exceptionRethrower->store(ZKGenericException(ZNOAUTH)); + } + } else { + LOGFWD(info, "State watching function: Unexpected event: '%d' -- '%d' ", type, state); + } +} + + +void* /* watcherContext */ +ZKFacade::registerWatcher(const NodeChangedWatcherSP& watcher) { + + UniqueLock lock(_watchersMutex); + boost::shared_ptr<ZKWatcher> zkWatcher(new ZKWatcher(shared_from_this(), watcher)); + _watchers[zkWatcher.get()] = zkWatcher; + return zkWatcher.get(); +} + + +boost::shared_ptr<ZKFacade::ZKWatcher> +ZKFacade::unregisterWatcher(void* watcherContext) { + UniqueLock lock(_watchersMutex); + + WatchersMap::iterator i = _watchers.find(watcherContext); + if (i == _watchers.end()) { + return boost::shared_ptr<ZKWatcher>(); + } else { + boost::shared_ptr<ZKWatcher> result = i->second; + _watchers.erase(i); + return result; + } +} + +void +ZKFacade::invokeWatcher(void* watcherContext) { + try { + boost::shared_ptr<ZKWatcher> watcher = unregisterWatcher(watcherContext); + + if (!_watchersEnabled) + return; + + if (watcher) { + (*watcher->_nodeChangedWatcher)(); + } else { + LOGFWD(error, "Invoke called on expired watcher."); + } + } catch(...) { + _exceptionRethrower->store(boost::current_exception()); + } +} + +/********** End live watchers ***************************************/ + + +ZKFacade::ZKFacade(const std::string& zkservers, + const boost::shared_ptr<ExceptionRethrower> &exceptionRethrower) + :_retriesEnabled(true), + _watchersEnabled(true), + _exceptionRethrower(exceptionRethrower), + _zhandle(zookeeper_init(zkservers.c_str(), + &ZKFacade::stateWatchingFun, + _zkSessionTimeOut, + 0, //clientid, + this, //context, + 0)) //flags +{ + if (!_zhandle) { + BOOST_THROW_EXCEPTION(ZKFailedConnecting()); + } +} + +ZKFacade::~ZKFacade() { + disableRetries(); + _watchersEnabled = false; + + boost::thread shutdownCaller(zookeeper_close, _zhandle); + if (shutdownCaller.timed_join(boost::posix_time::seconds(120))) { + LOGFWD(debug, "Zookeeper connection closed successfully."); + } else { + LOGFWD(info, "Timed out waiting for the zookeeper connection to shut down."); + abort(); + } +} + +const std::string +ZKFacade::getString(const Path& path) { + Buffer buffer(getData(path)); + return std::string(buffer.begin(), buffer.end()); +} + +const Move<Buffer> +ZKFacade::getData(const Path& path) { + RetryController retryController(this); + try { + Buffer buffer(_maxDataSize); + int bufferSize = _maxDataSize; + + const int watchIsOff = 0; + do { + Stat stat; + bufferSize = _maxDataSize; + + retryController( + zoo_get(_zhandle, path.string().c_str(), watchIsOff, + &*buffer.begin(), + &bufferSize, //in & out + &stat)); + } while(retryController.shouldRetry()); + + retryController.throwIfError(); + + buffer.resize(bufferSize); + return move(buffer); + + } catch(boost::exception& e) { + e <<errorinfo::Path(path); + throw; + } +} + +const Move<Buffer> +ZKFacade::getData(const Path& path, const NodeChangedWatcherSP& watcher) { + void* watcherContext = registerWatcher(watcher); + RetryController retryController(this); + + try { + Buffer buffer(_maxDataSize); + int bufferSize = _maxDataSize; + + do { + Stat stat; + bufferSize = _maxDataSize; + + retryController( + zoo_wget(_zhandle, path.string().c_str(), + &ZKWatcher::watcherFn, watcherContext, + &*buffer.begin(), + &bufferSize, //in & out + &stat)); + } while(retryController.shouldRetry()); + + retryController.throwIfError(); + + buffer.resize(bufferSize); + return move(buffer); + + } catch(boost::exception& e) { + unregisterWatcher(watcherContext); + e <<errorinfo::Path(path); + throw; + } +} + +void +ZKFacade::setData(const Path& path, const Buffer& buffer, bool mustExist) { + return setData(path, &*buffer.begin(), buffer.size(), mustExist); +} + +void +ZKFacade::setData(const Path& path, const char* buffer, size_t length, bool mustExist) { + assert (length < _maxDataSize); + + try { + if (mustExist || hasNode(path)) + setDataForExistingFile(*this, path, buffer, length, _zhandle); + else + setDataForNewFile(*this, path, buffer, length, _zhandle, 0); + } catch(boost::exception& e) { + e <<errorinfo::Path(path); + throw; + } +} + +const Path +ZKFacade::createSequenceNode(const Path& path, const char* buffer, size_t length) { + assert (length < _maxDataSize); + + int createFlags = ZOO_SEQUENCE; + return setDataForNewFile(*this, path, buffer, length, _zhandle, createFlags); +} + +bool +ZKFacade::hasNode(const Path& path) { + try { + RetryController retryController(this); + do { + Stat stat; + const int noWatch = 0; + retryController( + zoo_exists(_zhandle, path.string().c_str(), noWatch, &stat)); + } while(retryController.shouldRetry()); + + switch(retryController._lastStatus) { + case ZNONODE: + return false; + case ZOK: + return true; + default: + retryController.throwIfError(); + //this should never happen: + assert(false); + return false; + } + + } catch (boost::exception &e) { + e <<errorinfo::Path(path); + throw; + } +} + +bool +ZKFacade::hasNode(const Path& path, const NodeChangedWatcherSP& watcher) { + void* watcherContext = registerWatcher(watcher); + try { + RetryController retryController(this); + do { + Stat stat; + retryController( + zoo_wexists(_zhandle, path.string().c_str(), + &ZKWatcher::watcherFn, watcherContext, + &stat)); + } while(retryController.shouldRetry()); + + switch(retryController._lastStatus) { + case ZNONODE: + return false; + case ZOK: + return true; + default: + retryController.throwIfError(); + //this should never happen: + assert(false); + return false; + } + + } catch (boost::exception &e) { + unregisterWatcher(watcherContext); + e <<errorinfo::Path(path); + throw; + } +} + +void +ZKFacade::addEphemeralNode(const Path& path) { + try { + setDataForNewFile(*this, path, "", 0, _zhandle, ZOO_EPHEMERAL); + } catch(const ZKNodeExistsException& e) { + remove(path); + addEphemeralNode(path); + } catch (boost::exception& e) { + e <<errorinfo::Path(path); + throw; + } +} + +void +ZKFacade::remove(const Path& path) { + namespace ll = boost::lambda; + + std::vector< std::string > children = getChildren(path); + if (!children.empty()) { + std::for_each(children.begin(), children.end(), + ll::bind(&ZKFacade::remove, this, + ll::ret<Path>(path / ll::_1))); + } + + try { + RetryController retryController(this); + do { + int ignoreVersion = -1; + + retryController( + zoo_delete(_zhandle, path.string().c_str(), + ignoreVersion)); + } while (retryController.shouldRetry()); + + if (retryController._lastStatus != ZNONODE) + retryController.throwIfError(); + + } catch(boost::exception& e) { + e <<errorinfo::Path(path); + } +} + +void +ZKFacade::removeIfExists(const Path& path) { + try { + if (hasNode(path)) { + remove(path); + } + } catch (const ZKNodeDoesNotExistsException& e) { + //someone else removed it concurrently, not a problem. + } +} + +void +ZKFacade::retainOnly(const Path& path, const std::vector<std::string>& childrenToPreserve) { + typedef std::vector<std::string> Children; + + Children current = getChildren(path); + std::sort(current.begin(), current.end()); + + Children toPreserveSorted(childrenToPreserve); + std::sort(toPreserveSorted.begin(), toPreserveSorted.end()); + + namespace ll = boost::lambda; + std::set_difference(current.begin(), current.end(), + toPreserveSorted.begin(), toPreserveSorted.end(), + boost::make_function_output_iterator( + ll::bind(&ZKFacade::remove, this, + ll::ret<Path>(path / ll::_1)))); +} + +std::vector< std::string > +ZKFacade::getChildren(const Path& path) { + try { + RetryController retryController(this); + String_vector children; + do { + const bool watch = false; + retryController( + zoo_get_children(_zhandle, path.string().c_str(), watch, &children)); + } while(retryController.shouldRetry()); + + retryController.throwIfError(); + + DeallocateZKStringVectorGuard deallocateGuard(children); + + typedef std::vector<std::string> ResultType; + ResultType result; + result.reserve(children.count); + + std::copy(children.data, children.data + children.count, + std::back_inserter(result)); + + return result; + } catch (boost::exception& e) { + e <<errorinfo::Path(path); + throw; + } +} + +std::vector< std::string > +ZKFacade::getChildren(const Path& path, const NodeChangedWatcherSP& watcher) { + void* watcherContext = registerWatcher(watcher); + + try { + RetryController retryController(this); + String_vector children; + do { + retryController( + zoo_wget_children(_zhandle, path.string().c_str(), + &ZKWatcher::watcherFn, watcherContext, + &children)); + } while(retryController.shouldRetry()); + + retryController.throwIfError(); + + DeallocateZKStringVectorGuard deallocateGuard(children); + + typedef std::vector<std::string> ResultType; + ResultType result; + result.reserve(children.count); + + std::copy(children.data, children.data + children.count, + std::back_inserter(result)); + + return result; + } catch (boost::exception& e) { + unregisterWatcher(watcherContext); + e <<errorinfo::Path(path); + throw; + } +} + + +void +ZKFacade::disableRetries() { + _retriesEnabled = false; +} + +void +filedistribution::setupZooKeeperLogging() { + std::string filename(vespa::Defaults::vespaHome()); + filename.append("/tmp/zookeeper.log"); + FILE* file = std::fopen(filename.c_str(), "w"); + if (file == NULL) { + std::cerr <<"Could not open file " <<filename << std::endl; + } else { + zoo_set_log_stream(file); + } + + zoo_set_debug_level(ZOO_LOG_LEVEL_ERROR); +} + +const char* +ZKGenericException::what() const throw() { + switch(_zkStatus) { + //System errors + case ZRUNTIMEINCONSISTENCY: + return "Zookeeper: A runtime inconsistency was found(ZRUNTIMEINCONSISTENCY)"; + case ZDATAINCONSISTENCY: + return "Zookeeper: A data inconsistency was found(ZDATAINCONSISTENCY)"; + case ZCONNECTIONLOSS: + return "Zookeeper: Connection to the server has been lost(ZCONNECTIONLOSS)"; + case ZMARSHALLINGERROR: + return "Zookeeper: Error while marshalling or unmarshalling data(ZMARSHALLINGERROR)"; + case ZUNIMPLEMENTED: + return "Zookeeper: Operation is unimplemented(ZUNIMPLEMENTED)"; + case ZOPERATIONTIMEOUT: + return "Zookeeper: Operation timeout(ZOPERATIONTIMEOUT)"; + case ZBADARGUMENTS: + return "Zookeeper: Invalid arguments(ZBADARGUMENTS)"; + case ZINVALIDSTATE: + return "Zookeeper: The connection with the zookeeper servers timed out(ZINVALIDSTATE)."; + + //API errors + case ZNONODE: + return "Zookeeper: Node does not exist(ZNONODE)"; + case ZNOAUTH: + return "Zookeeper: Not authenticated(ZNOAUTH)"; + case ZBADVERSION: + return "Zookeeper: Version conflict(ZBADVERSION)"; + case ZNOCHILDRENFOREPHEMERALS: + return "Zookeeper: Ephemeral nodes may not have children(ZNOCHILDRENFOREPHEMERALS)"; + case ZNODEEXISTS: + return "Zookeeper: The node already exists(ZNODEEXISTS)"; + case ZNOTEMPTY: + return "Zookeeper: The node has children(ZNOTEMPTY)"; + case ZSESSIONEXPIRED: + return "Zookeeper: The session has been expired by the server(ZSESSIONEXPIRED)"; + case ZINVALIDCALLBACK: + return "Zookeeper: Invalid callback specified(ZINVALIDCALLBACK)"; + case ZINVALIDACL: + return "Zookeeper: Invalid ACL specified(ZINVALIDACL)"; + case ZAUTHFAILED: + return "Zookeeper: Client authentication failed(ZAUTHFAILED)"; + case ZCLOSING: + return "Zookeeper: ZooKeeper is closing(ZCLOSING)"; + case ZNOTHING: + return "Zookeeper: No server responses to process(ZNOTHING)"; + default: + std::cerr <<"In ZKGenericException::what(): Invalid error code " <<_zkStatus <<std::endl; + return "Zookeeper: Invalid error code."; + } +} + +const std::string +filedistribution::diagnosticUserLevelMessage(const ZKException& exception) { + const char* indent = " "; + std::ostringstream message; + message <<exception.what(); + + const errorinfo::Path::value_type* path = boost::get_error_info<errorinfo::Path>(exception); + if (path) { + message <<std::endl <<indent <<"Path: " <<*path; + } + return message.str(); +} diff --git a/filedistribution/src/vespa/filedistribution/model/zkfacade.h b/filedistribution/src/vespa/filedistribution/model/zkfacade.h new file mode 100644 index 00000000000..7db6b53cc18 --- /dev/null +++ b/filedistribution/src/vespa/filedistribution/model/zkfacade.h @@ -0,0 +1,135 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <string> +#include <vector> +#include <boost/noncopyable.hpp> +#include <boost/filesystem/path.hpp> +#include <boost/signals2.hpp> +#include <boost/enable_shared_from_this.hpp> + +#include <vespa/filedistribution/common/buffer.h> +#include <vespa/filedistribution/common/exception.h> +#include <vespa/filedistribution/common/exceptionrethrower.h> + +struct _zhandle; +typedef _zhandle zhandle_t; + +namespace filedistribution { + +namespace errorinfo { +typedef boost::error_info<struct tag_Path, boost::filesystem::path> Path; +} + +class ZKException : public Exception { +protected: + ZKException() {} +}; + +struct ZKNodeDoesNotExistsException : public ZKException { + const char* what() const throw() { + return "Zookeeper: The node does not exist(ZNONODE)."; + } +}; + +struct ZKNodeExistsException : public ZKException { + const char* what() const throw() { + return "Zookeeper: The node already exists(ZNODEEXISTS)."; + } +}; + +struct ZKGenericException : public ZKException { + const int _zkStatus; + ZKGenericException(int zkStatus) + :_zkStatus(zkStatus) + {} + + const char* what() const throw(); +}; + +struct ZKFailedConnecting : public ZKException { + const char* what() const throw() { + return "Zookeeper: Failed connecting to the zookeeper servers."; + } +}; + +class ZKSessionExpired : public ZKException {}; + +const std::string +diagnosticUserLevelMessage(const ZKException& zk); + + + +class ZKFacade : boost::noncopyable, public boost::enable_shared_from_this<ZKFacade> { + volatile bool _retriesEnabled; + volatile bool _watchersEnabled; + + boost::shared_ptr<ExceptionRethrower> _exceptionRethrower; + zhandle_t* _zhandle; + const static int _zkSessionTimeOut = 30 * 1000; + const static size_t _maxDataSize = 1024 * 1024; + + class ZKWatcher; + static void stateWatchingFun(zhandle_t*, int type, int state, const char* path, void* context); +public: + typedef boost::shared_ptr<ZKFacade> SP; + + /* Lifetime is managed by ZKFacade. + Derived classes should only contain weak_ptrs to other objects + to avoid linking their lifetime to the ZKFacade lifetime. + */ + class NodeChangedWatcher : boost::noncopyable { + public: + virtual void operator()() = 0; + virtual ~NodeChangedWatcher() {}; + }; + + typedef boost::shared_ptr<NodeChangedWatcher> NodeChangedWatcherSP; + typedef boost::filesystem::path Path; + + ZKFacade(const std::string& zkservers, const boost::shared_ptr<ExceptionRethrower> &); + ~ZKFacade(); + + bool hasNode(const Path&); + bool hasNode(const Path&, const NodeChangedWatcherSP&); + + const std::string getString(const Path&); + const Move<Buffer> getData(const Path&); //throws ZKNodeDoesNotExistsException + //if watcher is specified, it will be set even if the node does not exists + const Move<Buffer> getData(const Path&, const NodeChangedWatcherSP&); //throws ZKNodeDoesNotExistsException + + //Parent path must exist + void setData(const Path&, const Buffer& buffer, bool mustExist = false); + void setData(const Path&, const char* buffer, size_t length, bool mustExist = false); + + const Path createSequenceNode(const Path&, const char* buffer, size_t length); + + void remove(const Path&); //throws ZKNodeDoesNotExistsException + void removeIfExists(const Path&); + + void retainOnly(const Path&, const std::vector<std::string>& children); + + void addEphemeralNode(const Path& path); + std::vector<std::string> getChildren(const Path& path); + std::vector<std::string> getChildren(const Path& path, const NodeChangedWatcherSP&); //throws ZKNodeDoesNotExistsException + + //only for use by shutdown code. + void disableRetries(); + bool retriesEnabled() { + return _retriesEnabled; + } + +private: + void* registerWatcher(const NodeChangedWatcherSP &); //returns watcherContext + boost::shared_ptr<ZKWatcher> unregisterWatcher(void* watcherContext); + void invokeWatcher(void* watcherContext); + + boost::mutex _watchersMutex; + typedef std::map<void*, boost::shared_ptr<ZKWatcher> > WatchersMap; + WatchersMap _watchers; +}; + +void setupZooKeeperLogging(); + +} //namespace filedistribution + diff --git a/filedistribution/src/vespa/filedistribution/model/zkfiledbmodel.cpp b/filedistribution/src/vespa/filedistribution/model/zkfiledbmodel.cpp new file mode 100644 index 00000000000..3483a4eb359 --- /dev/null +++ b/filedistribution/src/vespa/filedistribution/model/zkfiledbmodel.cpp @@ -0,0 +1,298 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include "filedistributionmodel.h" + +#include <ostream> +#include <algorithm> +#include <boost/lambda/lambda.hpp> +#include <boost/lambda/bind.hpp> +#include <boost/foreach.hpp> + +#include "zkfacade.h" +#include "zkfiledbmodel.h" +#include "deployedfilestodownload.h" +#include <vespa/filedistribution/common/logfwd.h> + +namespace fs = boost::filesystem; + +using filedistribution::ZKFileDBModel; + +namespace { + +fs::path +createPath(const std::string& fileReference) { + return filedistribution::ZKFileDBModel::_fileDBPath / fileReference; +} + +void +createNode(const fs::path & path, filedistribution::ZKFacade& zk) { + if (!zk.hasNode(path)) + zk.setData(path, "", 0); +} + +bool +isEntryForHost(const std::string& host, const std::string& peerEntry) { + return host.size() < peerEntry.size() && + std::equal(host.begin(), host.end(), peerEntry.begin()) && + peerEntry[host.size()] == ZKFileDBModel::_peerEntrySeparator; +} + +std::vector<std::string> +getSortedChildren(filedistribution::ZKFacade& zk, const ZKFileDBModel::Path& path) { + std::vector<std::string> children = zk.getChildren(path); + std::sort(children.begin(), children.end()); + return children; +} + +} //anonymous namespace + +const ZKFileDBModel::Path ZKFileDBModel::_root = "/vespa/filedistribution"; +const ZKFileDBModel::Path ZKFileDBModel::_fileDBPath = _root / "files"; +const ZKFileDBModel::Path ZKFileDBModel::_hostsPath = _root / "hosts"; + +bool +ZKFileDBModel::hasFile(const std::string& fileReference) { + return _zk->hasNode(createPath(fileReference)); +} + +void +ZKFileDBModel::addFile(const std::string& fileReference, const Buffer& buffer) { + return _zk->setData(createPath(fileReference), buffer); +} + +filedistribution::Move<filedistribution::Buffer> +ZKFileDBModel::getFile(const std::string& fileReference) { + try { + return _zk->getData(createPath(fileReference)); + } catch(const ZKNodeDoesNotExistsException&) { + throw FileDoesNotExistException(); + } +} + +void +ZKFileDBModel::setDeployedFilesToDownload( + const std::string& hostName, + const std::string& appId, + const std::vector<std::string>& files) { + DeployedFilesToDownload d(_zk.get()); + d.setDeployedFilesToDownload(hostName, appId, files); +} + +void +ZKFileDBModel::cleanDeployedFilesToDownload( + const std::vector<std::string>& hostsToPreserve, + const std::string& appId) { + + std::vector<std::string> allHosts = getHosts(); + std::set<std::string> toPreserve(hostsToPreserve.begin(), hostsToPreserve.end()); + + for (auto & host : allHosts) { + Path hostPath = _hostsPath / host; + try { + removeLegacyDeployFileNodes(hostPath); + // If this host is NOT part of hosts to deploy to + if (toPreserve.find(host) == toPreserve.end()) { + removeDeployFileNodes(hostPath, appId); + if (canRemoveHost(hostPath, appId)) { + _zk->remove(hostPath); + } + } + } catch (const ZKNodeDoesNotExistsException& e) { + LOGFWD(debug, "Host '%s' changed. Not touching", hostPath.string().c_str()); + } + } +} + +void +ZKFileDBModel::removeDeploymentsThatHaveDifferentApplicationId( + const std::vector<std::string>& hostsToPreserve, + const std::string& appId) { + + std::vector<std::string> allHosts = getHosts(); + std::set<std::string> toPreserve(hostsToPreserve.begin(), hostsToPreserve.end()); + + for (auto & host : allHosts) { + Path hostPath = _hostsPath / host; + try { + if (toPreserve.find(host) != toPreserve.end()) { + removeNonApplicationFiles(hostPath, appId); + } + } catch (const ZKNodeDoesNotExistsException& e) { + LOGFWD(debug, "Host '%s' changed. Not touching", hostPath.string().c_str()); + } + } +} + + +// Delete files which do not belong to this application. +void +ZKFileDBModel::removeNonApplicationFiles(const Path & hostPath, const std::string& appId) +{ + std::vector<std::string> deployNodes = _zk->getChildren(hostPath); + for (auto & deployNode : deployNodes) { + Path deployNodePath = hostPath / deployNode; + std::string applicationId(readApplicationId(*_zk, deployNodePath)); + if (appId != applicationId) { + _zk->remove(deployNodePath); + } + } +} + + +void +ZKFileDBModel::removeLegacyDeployFileNodes(const Path & hostPath) +{ + std::vector<std::string> deployNodes = _zk->getChildren(hostPath); + for (auto & deployNode : deployNodes) { + Path deployNodePath = hostPath / deployNode; + std::string applicationId(readApplicationId(*_zk, deployNodePath)); + size_t numParts = std::count(applicationId.begin(), applicationId.end(), ':'); + // If we have an id with 3 colons, it is a legacy id and can be deleted. + if (numParts == 3) { + _zk->remove(deployNodePath); + } + } +} + +void +ZKFileDBModel::removeDeployFileNodes(const Path & hostPath, const std::string& appId) { + std::vector<std::string> deployNodes = _zk->getChildren(hostPath); + for (auto & deployNode : deployNodes) { + Path deployNodePath = hostPath / deployNode; + std::string applicationId(readApplicationId(*_zk, deployNodePath)); + if (appId == applicationId) { + _zk->remove(deployNodePath); + } + } +} + +bool +ZKFileDBModel::canRemoveHost(const Path & hostPath, const std::string& appId) { + std::vector<std::string> deployNodes = _zk->getChildren(hostPath); + for (auto & deployNode : deployNodes) { + Path deployNodePath = hostPath / deployNode; + std::string applicationId(readApplicationId(*_zk, deployNodePath)); + if (appId != applicationId) { + return false; + } + } + return true; +} + +std::vector<std::string> +ZKFileDBModel::getHosts() { + try { + return _zk->getChildren(_hostsPath); + } catch(ZKNodeDoesNotExistsException&) { + LOGFWD(debug, "No files to be distributed."); + return std::vector<std::string>(); + } +} + +namespace { +const ZKFileDBModel::Progress::value_type notStarted = 101; +}; + +//TODO: Refactor +ZKFileDBModel::HostStatus +ZKFileDBModel::getHostStatus(const std::string& hostName) { + typedef std::vector<std::string> PeerEntries; + + DeployedFilesToDownload d(_zk.get()); + DeployedFilesToDownload::FileReferences filesToDownload = d.getLatestDeployedFilesToDownload(hostName); + + HostStatus hostStatus; + hostStatus._state = HostStatus::notStarted; + hostStatus._numFilesToDownload = filesToDownload.size(); + hostStatus._numFilesFinished = 0; + + BOOST_FOREACH(std::string file, filesToDownload) { + Path path = getPeersPath(file); + + const PeerEntries peerEntries = getSortedChildren(*_zk, path); + PeerEntries::const_iterator candidate = + std::lower_bound(peerEntries.begin(), peerEntries.end(), hostName); + + if (candidate != peerEntries.end() && isEntryForHost(hostName, *candidate)) { + char fileProgressPercentage = getProgress(path / (*candidate)); + if (fileProgressPercentage == 100) { + hostStatus._numFilesFinished++; + } else if (fileProgressPercentage != notStarted) { + hostStatus._state = HostStatus::inProgress; + } + + candidate++; + if (candidate != peerEntries.end() && isEntryForHost(hostName, *candidate)) + BOOST_THROW_EXCEPTION(InvalidHostStatusException()); + } + } + + + if (hostStatus._numFilesToDownload == hostStatus._numFilesFinished) { + hostStatus._state = HostStatus::finished; + } + + return hostStatus; +} + +void +ZKFileDBModel::cleanFiles( + const std::vector<std::string>& filesToPreserve) { + _zk->retainOnly(_fileDBPath, filesToPreserve); +} + +ZKFileDBModel::ZKFileDBModel(const boost::shared_ptr<ZKFacade>& zk) + : _zk(zk) +{ + createNode(_root, *_zk); + createNode(_fileDBPath, *_zk); + createNode(_hostsPath, *_zk); +} + + +char +ZKFileDBModel::getProgress(const Path& path) { + try { + Buffer buffer(_zk->getData(path)); + if (buffer.size() == 1) + return buffer[0]; + else if (buffer.size() == 0) + return 0; + else { + throw boost::enable_current_exception(InvalidProgressException()) + <<errorinfo::Path(path); + } + } catch (ZKNodeDoesNotExistsException& e) { + //progress information deleted + return notStarted; + } +} + +ZKFileDBModel::Progress +ZKFileDBModel::getProgress(const std::string& fileReference, + const std::vector<std::string>& hostsSortedAscending) { + Path path = getPeersPath(fileReference); + + Progress progress; + progress.reserve(hostsSortedAscending.size()); + + typedef std::vector<std::string> PeerEntries; + const PeerEntries peerEntries = getSortedChildren(*_zk, path); + + PeerEntries::const_iterator current = peerEntries.begin(); + BOOST_FOREACH(const std::string& host, hostsSortedAscending) { + PeerEntries::const_iterator candidate = + std::lower_bound(current, peerEntries.end(), host); + + ZKFileDBModel::Progress::value_type hostProgress = notStarted; + if (candidate != peerEntries.end()) { + current = candidate; + if (isEntryForHost(host, *current)) + hostProgress = getProgress(path / (*candidate)); + } + progress.push_back(hostProgress); + } + return progress; +} + +filedistribution::FileDBModel::~FileDBModel() {} diff --git a/filedistribution/src/vespa/filedistribution/model/zkfiledbmodel.h b/filedistribution/src/vespa/filedistribution/model/zkfiledbmodel.h new file mode 100644 index 00000000000..cf180f4c780 --- /dev/null +++ b/filedistribution/src/vespa/filedistribution/model/zkfiledbmodel.h @@ -0,0 +1,59 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <boost/shared_ptr.hpp> + +#include "filedistributionmodel.h" +#include "zkfacade.h" + +namespace filedistribution { + +class ZKFileDBModel : public FileDBModel { +public: + typedef boost::filesystem::path Path; +private: + const boost::shared_ptr<ZKFacade> _zk; + char getProgress(const Path& path); + void removeDeployFileNodes(const Path& hostPath, const std::string& appId); + void removeLegacyDeployFileNodes(const Path& hostPath); + bool canRemoveHost(const Path& hostPath, const std::string& appId); +public: + const static Path _root; + const static Path _fileDBPath; + const static Path _hostsPath; + + const static char _peerEntrySeparator = ':'; + + Path getPeersPath(const std::string& fileReference) { + return _fileDBPath / fileReference; + } + + //overrides + bool hasFile(const std::string& fileReference); + void addFile(const std::string& fileReference, const Buffer& buffer); + Move<Buffer> getFile(const std::string& fileReference); + void cleanFiles(const std::vector<std::string>& filesToPreserve); + + void setDeployedFilesToDownload(const std::string& hostName, + const std::string & appId, + const std::vector<std::string> & files); + void cleanDeployedFilesToDownload( + const std::vector<std::string> & hostsToPreserve, + const std::string& appId); + void removeDeploymentsThatHaveDifferentApplicationId( + const std::vector<std::string> & hostsToPreserve, + const std::string& appId); + void removeNonApplicationFiles( + const Path & hostPath, + const std::string& appId); + std::vector<std::string> getHosts(); + HostStatus getHostStatus(const std::string& hostName); + + ZKFileDBModel(const boost::shared_ptr<ZKFacade>& zk); + + Progress getProgress(const std::string& fileReference, + const std::vector<std::string>& hostsSortedAscending); +}; + +} //namespace filedistribution + diff --git a/filedistribution/src/vespa/filedistribution/rpc/CMakeLists.txt b/filedistribution/src/vespa/filedistribution/rpc/CMakeLists.txt new file mode 100644 index 00000000000..d3691ef4d84 --- /dev/null +++ b/filedistribution/src/vespa/filedistribution/rpc/CMakeLists.txt @@ -0,0 +1,6 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_library(filedistribution_filedistributorrpc STATIC + SOURCES + filedistributorrpc.cpp + DEPENDS +) diff --git a/filedistribution/src/vespa/filedistribution/rpc/filedistributorrpc.cpp b/filedistribution/src/vespa/filedistribution/rpc/filedistributorrpc.cpp new file mode 100644 index 00000000000..e58165f1f0f --- /dev/null +++ b/filedistribution/src/vespa/filedistribution/rpc/filedistributorrpc.cpp @@ -0,0 +1,279 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include "filedistributorrpc.h" + +#include <boost/noncopyable.hpp> +#include <boost/optional.hpp> +#include <boost/thread/mutex.hpp> +#include <boost/thread/locks.hpp> +#include <boost/lambda/bind.hpp> +#include <boost/foreach.hpp> +#include <boost/exception/diagnostic_information.hpp> + +#include <vespa/log/log.h> +LOG_SETUP(".filedistributorrpc"); + +#include <vespa/fnet/frt/frt.h> +#include <vespa/frtstream/frtserverstream.h> +#include <map> + +#include "fileprovider.h" +#include <vespa/filedistribution/model/filedbmodel.h> + +using filedistribution::FileDistributorRPC; +namespace ll = boost::lambda; + +namespace { +typedef boost::lock_guard<boost::mutex> LockGuard; + +struct RPCErrorCodes { + const static uint32_t baseErrorCode = 0x10000; + const static uint32_t baseFileProviderErrorCode = baseErrorCode + 0x1000; + + const static uint32_t unknownError = baseErrorCode + 1; +}; + +class QueuedRequests { + bool _shuttingDown; + + boost::mutex _mutex; + typedef std::multimap<std::string, FRT_RPCRequest*> Map; + Map _queuedRequests; + + template <class FUNC> + void returnAnswer(const std::string& fileReference, FUNC func) { + LockGuard guard(_mutex); + + typedef Map::iterator iterator; + std::pair<iterator, iterator> range = _queuedRequests.equal_range(fileReference); + + BOOST_FOREACH( const Map::value_type& request, range) { + LOG(info, "Returning earlier enqueued request for file reference '%s'.", request.first.c_str()); + func(*request.second); + request.second->Return(); + } + + _queuedRequests.erase(range.first, range.second); + } + + struct DownloadFinished { + const std::string& _path; + + void operator()(FRT_RPCRequest& request) { + LOG(info, "Download finished: '%s'", _path.c_str()); + frtstream::FrtServerStream requestHandler(&request); + requestHandler <<_path; + } + + DownloadFinished(const std::string& path) + :_path(path) + {} + }; + + struct DownloadFailed { + filedistribution::FileProvider::FailedDownloadReason _reason; + + void operator()(FRT_RPCRequest& request) { + LOG(info, "Download failed: '%d'", _reason); + request.SetError(RPCErrorCodes::baseFileProviderErrorCode + _reason, "Download failed"); + } + + DownloadFailed(filedistribution::FileProvider::FailedDownloadReason reason) + :_reason(reason) + {} + }; + +public: + QueuedRequests() + :_shuttingDown(false) + {} + + void enqueue(const std::string& fileReference, FRT_RPCRequest* request) { + LockGuard guard(_mutex); + + if (_shuttingDown) { + LOG(info, "Shutdown: Aborting request for file reference '%s'.", fileReference.c_str()); + abort(request); + } else { + _queuedRequests.insert(std::make_pair(fileReference, request)); + } + } + + void abort(FRT_RPCRequest* request) { + request->SetError(FRTE_RPC_ABORT); + request->Return(); + } + + void dequeue(const std::string& fileReference, FRT_RPCRequest* request) { + LockGuard guard(_mutex); + + typedef Map::iterator iterator; + std::pair<iterator, iterator> range = _queuedRequests.equal_range(fileReference); + + iterator candidate = std::find(range.first, range.second, + std::pair<const std::string, FRT_RPCRequest*>(fileReference, request)); + + if (candidate != range.second) + _queuedRequests.erase(candidate); + } + + void downloadFinished(const std::string& fileReference, + const boost::filesystem::path& path) { + + DownloadFinished handler(path.string()); + returnAnswer(fileReference, handler); + } + + void downloadFailed(const std::string& fileReference, + filedistribution::FileProvider::FailedDownloadReason reason) { + + DownloadFailed handler(reason); + returnAnswer(fileReference, handler); + } + + void shutdown() { + LockGuard guard(_mutex); + _shuttingDown = true; + + BOOST_FOREACH( const Map::value_type& request, _queuedRequests) { + LOG(info, "Shutdown: Aborting earlier enqueued request for file reference '%s'.", request.first.c_str()); + abort(request.second); + } + _queuedRequests.erase(_queuedRequests.begin(), _queuedRequests.end()); + } +}; + +} //anonymous namespace + +class FileDistributorRPC::Server : public FRT_Invokable, boost::noncopyable { + public: + boost::shared_ptr<FileProvider> _fileProvider; + std::unique_ptr<FRT_Supervisor> _supervisor; + + QueuedRequests _queuedRequests; + + boost::signals2::scoped_connection _downloadCompletedConnection; + boost::signals2::scoped_connection _downloadFailedConnection; + + void queueRequest(const std::string& fileReference, FRT_RPCRequest* request); + void defineMethods(); + + Server(int listen_port, + const boost::shared_ptr<FileProvider>& provider); + void start(const boost::shared_ptr<FileDistributorRPC> parent); + ~Server(); + + void waitFor(FRT_RPCRequest*); +}; + +FileDistributorRPC:: +Server::Server(int listen_port, + const boost::shared_ptr<filedistribution::FileProvider>& provider) + :_fileProvider(provider), + _supervisor(new FRT_Supervisor()) +{ + defineMethods(); + _supervisor->Listen(listen_port); + _supervisor->Start(); +} + + +FileDistributorRPC:: +Server::~Server() { + _queuedRequests.shutdown(); + + const bool waitForFinished = true; + _supervisor->ShutDown(waitForFinished); +} + +void +FileDistributorRPC::Server::start(const boost::shared_ptr<FileDistributorRPC> parent) { + _downloadCompletedConnection = + _fileProvider->downloadCompleted().connect(FileProvider::DownloadCompletedSignal::slot_type( + ll::bind(&QueuedRequests::downloadFinished, &_queuedRequests, ll::_1, ll::_2)). + track(parent)); + + _downloadFailedConnection = + _fileProvider->downloadFailed().connect(FileProvider::DownloadFailedSignal::slot_type( + ll::bind(&QueuedRequests::downloadFailed, &_queuedRequests, ll::_1, ll::_2)). + track(parent)); + + +} + +void +FileDistributorRPC:: +Server::queueRequest(const std::string& fileReference, FRT_RPCRequest* request) { + _queuedRequests.enqueue( fileReference, request ); + try { + _fileProvider->downloadFile(fileReference); + } catch(...) { + _queuedRequests.dequeue(fileReference, request); + throw; + } +} + +void +FileDistributorRPC:: +Server::defineMethods() { + const bool instant = true; + FRT_ReflectionBuilder builder(_supervisor.get()); + builder.DefineMethod("waitFor", "s", "s", instant, + FRT_METHOD(Server::waitFor), this); +} + +void +FileDistributorRPC:: +Server::waitFor(FRT_RPCRequest* request) { + try { + frtstream::FrtServerStream requestHandler(request); + std::string fileReference; + requestHandler >> fileReference; + boost::optional<boost::filesystem::path> path + = _fileProvider->getPath(fileReference); + if (path) { + LOG(debug, "Returning request for file reference '%s'.", fileReference.c_str()); + requestHandler << path->string(); + } else { + LOG(debug, "Enqueuing file request for file reference '%s'.", fileReference.c_str()); + request->Detach(); + queueRequest(fileReference, request); + } + } catch (const FileDoesNotExistException&) { + LOG(warning, "Recieved a request for a file reference that does not exist in zookeeper."); + request->SetError(RPCErrorCodes::baseFileProviderErrorCode + FileProvider::FileReferenceDoesNotExist, + "No such file reference"); + request->Return(); + } catch (const std::exception& e) { + LOG(error, "An exception occurred while calling the rpc method waitFor:%s", + boost::diagnostic_information(e).c_str()); + request->SetError(RPCErrorCodes::unknownError, boost::diagnostic_information(e).c_str()); + request->Return(); //the request might be detached. + } +} + +FileDistributorRPC::FileDistributorRPC(const std::string& connectionSpec, + const boost::shared_ptr<filedistribution::FileProvider>& provider) + :_server(new Server(get_port(connectionSpec), provider)) +{} + +void +FileDistributorRPC::start() +{ + _server->start(shared_from_this()); +} + +int +FileDistributorRPC::get_port(const std::string &spec) +{ + const char *port = (spec.data() + spec.size()); + while ((port > spec.data()) && (port[-1] >= '0') && (port[-1] <= '9')) { + --port; + } + return atoi(port); +} + +filedistribution::FileDistributorRPC::~FileDistributorRPC() +{ + LOG(debug, "Deconstructing FileDistributorRPC"); +} diff --git a/filedistribution/src/vespa/filedistribution/rpc/filedistributorrpc.h b/filedistribution/src/vespa/filedistribution/rpc/filedistributorrpc.h new file mode 100644 index 00000000000..e527fc2aeca --- /dev/null +++ b/filedistribution/src/vespa/filedistribution/rpc/filedistributorrpc.h @@ -0,0 +1,30 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <memory> +#include <boost/enable_shared_from_this.hpp> +#include <boost/noncopyable.hpp> + +#include "fileprovider.h" + +namespace filedistribution { + +class FileDistributorRPC : boost::noncopyable, + public boost::enable_shared_from_this<FileDistributorRPC> +{ + class Server; +public: + FileDistributorRPC(const std::string& connectSpec, + const boost::shared_ptr<FileProvider>& provider); + + void start(); + + static int get_port(const std::string &spec); + + ~FileDistributorRPC(); +private: + std::unique_ptr<Server> _server; +}; + +} //namespace filedistribution + diff --git a/filedistribution/src/vespa/filedistribution/rpc/fileprovider.h b/filedistribution/src/vespa/filedistribution/rpc/fileprovider.h new file mode 100644 index 00000000000..a95b50fc0f2 --- /dev/null +++ b/filedistribution/src/vespa/filedistribution/rpc/fileprovider.h @@ -0,0 +1,39 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include<boost/optional.hpp> +#include<boost/filesystem/path.hpp> +#include<boost/signals2.hpp> + +namespace filedistribution { + +class FileProvider +{ +public: + typedef boost::signals2::signal<void (const std::string& /* fileReference */, + const boost::filesystem::path&)> + DownloadCompletedSignal; + typedef DownloadCompletedSignal::slot_type DownloadCompletedHandler; + + enum FailedDownloadReason { + FileReferenceDoesNotExist, + FileReferenceRemoved + }; + + typedef boost::signals2::signal<void (const std::string& /* fileReference */, + FailedDownloadReason)> + DownloadFailedSignal; + typedef DownloadFailedSignal::slot_type DownloadFailedHandler; + + virtual boost::optional<boost::filesystem::path> getPath(const std::string& fileReference) = 0; + virtual void downloadFile(const std::string& fileReference) = 0; + + virtual ~FileProvider() {} + + //Signals + virtual DownloadCompletedSignal& downloadCompleted() = 0; + virtual DownloadFailedSignal& downloadFailed() = 0; +}; + +} //namespace filedistribution + diff --git a/filedistribution/src/vespa/filedistribution/rpcconfig/CMakeLists.txt b/filedistribution/src/vespa/filedistribution/rpcconfig/CMakeLists.txt new file mode 100644 index 00000000000..e3b90c464c3 --- /dev/null +++ b/filedistribution/src/vespa/filedistribution/rpcconfig/CMakeLists.txt @@ -0,0 +1,7 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_library(filedistribution_filedistributorrpcconfig STATIC + SOURCES + DEPENDS +) +vespa_generate_config(filedistribution_filedistributorrpcconfig filedistributorrpc.def) +install(FILES filedistributorrpc.def DESTINATION var/db/vespa/config_server/serverdb/classes) diff --git a/filedistribution/src/vespa/filedistribution/rpcconfig/filedistributorrpc.def b/filedistribution/src/vespa/filedistribution/rpcconfig/filedistributorrpc.def new file mode 100644 index 00000000000..ceadd8ae8c8 --- /dev/null +++ b/filedistribution/src/vespa/filedistribution/rpcconfig/filedistributorrpc.def @@ -0,0 +1,3 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +namespace=cloud.config.filedistribution +connectionspec string |