diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2018-02-08 17:30:00 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2018-02-08 17:30:00 +0000 |
commit | e4c657f4c777eb61e1c8d88aba5e52a0862212ff (patch) | |
tree | eae98c8cad6125af035eac3c1bbb9530fbb15c69 | |
parent | e90d8aeecfd7c39826a2ee261c4320b3d2260ac0 (diff) |
Remove filedistribution cpp code
88 files changed, 10 insertions, 6204 deletions
diff --git a/filedistribution/CMakeLists.txt b/filedistribution/CMakeLists.txt deleted file mode 100644 index 8baee25a192..00000000000 --- a/filedistribution/CMakeLists.txt +++ /dev/null @@ -1,39 +0,0 @@ -# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -vespa_define_module( - DEPENDS - fastos - vespalog - fnet - frtstream - configdefinitions - config_cloudconfig - staging_vespalib - vespadefaults - vespalib - fileacquirer - - EXTERNAL_DEPENDS - torrent-rasterbar - - LIBS - src/vespa/filedistribution/rpc - src/vespa/filedistribution/common - src/vespa/filedistribution/manager - src/vespa/filedistribution/distributor - src/vespa/filedistribution/model - - TESTS - src/tests/zkfacade - src/tests/zkfiledbmodel - src/tests/filedbmodelimpl - src/tests/status - src/tests/scheduler - src/tests/rpc - src/tests/common - src/tests/filedownloader - src/tests/lib - - APPS - src/apps/status - src/apps/filedistributor -) diff --git a/filedistribution/src/apps/.gitignore b/filedistribution/src/apps/.gitignore deleted file mode 100644 index f3c7a7c5da6..00000000000 --- a/filedistribution/src/apps/.gitignore +++ /dev/null @@ -1 +0,0 @@ -Makefile diff --git a/filedistribution/src/apps/filedistributor/.gitignore b/filedistribution/src/apps/filedistributor/.gitignore deleted file mode 100644 index 89aaa70637e..00000000000 --- a/filedistribution/src/apps/filedistributor/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -/filedistributor -vespa-filedistributor-bin diff --git a/filedistribution/src/apps/filedistributor/CMakeLists.txt b/filedistribution/src/apps/filedistributor/CMakeLists.txt deleted file mode 100644 index 22ffcddcd6f..00000000000 --- a/filedistribution/src/apps/filedistributor/CMakeLists.txt +++ /dev/null @@ -1,17 +0,0 @@ -# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -vespa_add_executable(filedistribution_filedistributor_app - SOURCES - filedistributor.cpp - OUTPUT_NAME vespa-filedistributor-bin - INSTALL sbin - DEPENDS - filedistribution_distributor - filedistribution_filedistributionmodel - filedistribution_filedistributorrpc - filedistribution_common -) -target_compile_options(filedistribution_filedistributor_app PRIVATE -DTORRENT_DISABLE_ENCRYPTION -DTORRENT_DISABLE_DHT -DWITH_SHIPPED_GEOIP_H -DBOOST_ASIO_HASH_MAP_BUCKETS=1021 -DBOOST_EXCEPTION_DISABLE -DBOOST_ASIO_ENABLE_CANCELIO -DBOOST_ASIO_DYN_LINK -DTORRENT_LINKING_SHARED) -vespa_add_target_system_dependency(filedistribution_filedistributor_app boost boost_system${VESPA_BOOST_LIB_SUFFIX}) -vespa_add_target_system_dependency(filedistribution_filedistributor_app boost boost_thread${VESPA_BOOST_LIB_SUFFIX}) -vespa_add_target_system_dependency(filedistribution_filedistributor_app boost boost_program_options${VESPA_BOOST_LIB_SUFFIX}) -vespa_add_target_system_dependency(filedistribution_filedistributor_app boost boost_filesystem${VESPA_BOOST_LIB_SUFFIX}) diff --git a/filedistribution/src/apps/filedistributor/filedistributor.cpp b/filedistribution/src/apps/filedistributor/filedistributor.cpp deleted file mode 100644 index f2bf5fa0ccd..00000000000 --- a/filedistribution/src/apps/filedistributor/filedistributor.cpp +++ /dev/null @@ -1,375 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include <vespa/config-filedistributor.h> -#include <vespa/config-filereferences.h> - -#include <vespa/filedistribution/distributor/filedistributortrackerimpl.h> -#include <vespa/filedistribution/distributor/filedownloadermanager.h> -#include <vespa/filedistribution/distributor/signalhandling.h> -#include <vespa/filedistribution/distributor/state_server_impl.h> - -#include <vespa/filedistribution/model/filedistributionmodelimpl.h> -#include <vespa/filedistribution/rpc/filedistributorrpc.h> -#include <vespa/filedistribution/common/componentsdeleter.h> -#include <vespa/fileacquirer/config-filedistributorrpc.h> -#include <vespa/config/common/exceptions.h> -#include <vespa/config-zookeepers.h> -#include <vespa/fastos/app.h> -#include <boost/program_options.hpp> - -namespace { -const char* programName = "filedistributor"; -} - -#include <vespa/log/log.h> -LOG_SETUP(programName); - -using namespace std::literals; - -using namespace filedistribution; -using cloud::config::ZookeepersConfig; -using cloud::config::filedistribution::FiledistributorConfig; -using cloud::config::filedistribution::FiledistributorrpcConfig; - -class FileDistributor : public config::IFetcherCallback<ZookeepersConfig>, - public config::IFetcherCallback<FiledistributorConfig>, - public config::IFetcherCallback<FiledistributorrpcConfig>, - public config::IGenerationCallback -{ - class Components { - ComponentsDeleter _componentsDeleter; - public: - const std::shared_ptr<ZKFacade> _zk; - const std::shared_ptr<FileDistributionModelImpl> _model; - const std::shared_ptr<FileDistributorTrackerImpl> _tracker; - const std::shared_ptr<FileDownloader> _downloader; - const FileDownloaderManager::SP _manager; - const FileDistributorRPC::SP _rpcHandler; - const std::shared_ptr<StateServerImpl> _stateServer; - - private: - class GuardedThread { - public: - GuardedThread(const GuardedThread &) = delete; - GuardedThread & operator = (const GuardedThread &) = delete; - GuardedThread(const std::shared_ptr<FileDownloader> & downloader) : - _downloader(downloader), - _thread([downloader=_downloader] () { downloader->runEventLoop(); }) - { } - ~GuardedThread() { - _downloader->close(); - if (_thread.joinable()) { - _thread.join(); - } - if ( !_downloader->drained() ) { - LOG(error, "The filedownloader did not drain fully. We will just exit quickly and let a restart repair it for us."); - std::quick_exit(67); - } - } - private: - std::shared_ptr<FileDownloader> _downloader; - std::thread _thread; - }; - std::unique_ptr<GuardedThread> _downloaderEventLoopThread; - config::ConfigFetcher _configFetcher; - - template <class T> - typename std::shared_ptr<T> track(T* component) { - return _componentsDeleter.track(component); - } - - public: - Components(const Components &) = delete; - Components & operator = (const Components &) = delete; - - Components(const config::ConfigUri & configUri, - const ZookeepersConfig& zooKeepersConfig, - const FiledistributorConfig& fileDistributorConfig, - const FiledistributorrpcConfig& rpcConfig) - :_zk(track(new ZKFacade(zooKeepersConfig.zookeeperserverlist, false))), - _model(track(new FileDistributionModelImpl(fileDistributorConfig.hostname, fileDistributorConfig.torrentport, _zk))), - _tracker(track(new FileDistributorTrackerImpl(_model))), - _downloader(track(new FileDownloader(_tracker, fileDistributorConfig.hostname, fileDistributorConfig.torrentport, Path(fileDistributorConfig.filedbpath)))), - _manager(track(new FileDownloaderManager(_downloader, _model))), - _rpcHandler(track(new FileDistributorRPC(rpcConfig.connectionspec, _manager))), - _stateServer(track(new StateServerImpl(fileDistributorConfig.stateport))), - _downloaderEventLoopThread(), - _configFetcher(configUri.getContext()) - { - _downloaderEventLoopThread = std::make_unique<GuardedThread>(_downloader); - _manager->start(); - _rpcHandler->start(); - - _tracker->setDownloader(_downloader); - _configFetcher.subscribe<FilereferencesConfig>(configUri.getConfigId(), _model.get()); - _configFetcher.start(); - updatedConfig(_configFetcher.getGeneration()); - } - - void updatedConfig(int64_t generation) { - vespalib::ComponentConfigProducer::Config curr("filedistributor", generation); - _stateServer->myComponents.addConfig(curr); - } - - ~Components() { - _configFetcher.close(); - //Do not waste time retrying zookeeper operations when going down. - _zk->disableRetries(); - - _downloaderEventLoopThread.reset(); - } - - }; - - typedef std::lock_guard<std::mutex> LockGuard; - std::mutex _configMutex; - - bool _completeReconfigurationNeeded; - std::unique_ptr<ZookeepersConfig> _zooKeepersConfig; - std::unique_ptr<FiledistributorConfig> _fileDistributorConfig; - std::unique_ptr<FiledistributorrpcConfig> _rpcConfig; - - std::unique_ptr<Components> _components; -public: - FileDistributor(const FileDistributor &) = delete; - FileDistributor & operator = (const FileDistributor &) = delete; - FileDistributor(); - ~FileDistributor(); - - void notifyGenerationChange(int64_t generation) override { - if (_components && ! completeReconfigurationNeeded()) { - _components->updatedConfig(generation); - } - } - - //configure overrides - void configure(std::unique_ptr<ZookeepersConfig> config) override { - LockGuard guard(_configMutex); - _zooKeepersConfig = std::move(config); - _completeReconfigurationNeeded = true; - } - - void configure(std::unique_ptr<FiledistributorConfig> config) override { - LockGuard guard(_configMutex); - if (_fileDistributorConfig.get() != NULL && - (config->torrentport != _fileDistributorConfig->torrentport || - config->stateport != _fileDistributorConfig->stateport || - config->hostname != _fileDistributorConfig->hostname || - config->filedbpath != _fileDistributorConfig->filedbpath)) - { - _completeReconfigurationNeeded = true; - } else if (_components.get()) { - configureSpeedLimits(*config); - } - _fileDistributorConfig = std::move(config); - - } - - void configure(std::unique_ptr<FiledistributorrpcConfig> config) override { - LockGuard guard(_configMutex); - _rpcConfig = std::move(config); - _completeReconfigurationNeeded = true; - } - - void run(const config::ConfigUri & configUri) { - while (!askedToShutDown()) { - clearReinitializeFlag(); - runImpl(configUri); - } - } - - bool isConfigComplete() { - LockGuard guard(_configMutex); - return (_zooKeepersConfig && _fileDistributorConfig && _rpcConfig); - } - void createComponents(const config::ConfigUri & configUri) { - LockGuard guard(_configMutex); - _components.reset( - new Components(configUri, - *_zooKeepersConfig, - *_fileDistributorConfig, - *_rpcConfig)); - - configureSpeedLimits(*_fileDistributorConfig); - _completeReconfigurationNeeded = false; - } - - bool completeReconfigurationNeeded() { - LockGuard guard(_configMutex); - if (_completeReconfigurationNeeded) { - LOG(debug, "Complete reconfiguration needed"); - } - return _completeReconfigurationNeeded; - } - - void configureSpeedLimits(const FiledistributorConfig& config) { - FileDownloader& downloader = *_components->_downloader; - downloader.setMaxDownloadSpeed(config.maxdownloadspeed); - downloader.setMaxUploadSpeed(config.maxuploadspeed); - } - - void runImpl(const config::ConfigUri & configUri) { - createComponents(configUri); - - // We do not want back to back reinitializing as it gives zero time for serving - // some torrents. - int postPoneAskedToReinitializedSecs = 50; - - while (!askedToShutDown() && - (postPoneAskedToReinitializedSecs > 0 || !askedToReinitialize()) && - !completeReconfigurationNeeded()) - { - postPoneAskedToReinitializedSecs--; - std::this_thread::sleep_for(1s); - } - _components.reset(); - } -}; - -FileDistributor::FileDistributor() - : _configMutex(), - _completeReconfigurationNeeded(false), - _zooKeepersConfig(), - _fileDistributorConfig(), - _rpcConfig(), - _components() -{ } -FileDistributor::~FileDistributor() { } - -class FileDistributorApplication : public FastOS_Application { - const config::ConfigUri _configUri; -public: - FileDistributorApplication(const config::ConfigUri & configUri); - - int Main() override; -}; - -namespace { - -struct ProgramOptionException { - std::string _msg; - ProgramOptionException(const std::string & msg) - : _msg(msg) - {} -}; - -bool exists(const std::string& optionName, const boost::program_options::variables_map& map) { - return map.find(optionName) != map.end(); -} - -void ensureExists(const std::string& optionName, const boost::program_options::variables_map& map ) { - if (!exists(optionName, map)) { - throw ProgramOptionException("Error: Missing option " + optionName); - } -} - -} //anonymous namespace - -FileDistributorApplication::FileDistributorApplication(const config::ConfigUri & configUri) - :_configUri(configUri) { -} - -int -FileDistributorApplication::Main() { - try { - FileDistributor distributor; - - config::ConfigFetcher configFetcher(_configUri.getContext()); - configFetcher.subscribe<ZookeepersConfig>(_configUri.getConfigId(), &distributor); - configFetcher.subscribe<FiledistributorConfig>(_configUri.getConfigId(), &distributor); - configFetcher.subscribe<FiledistributorrpcConfig>(_configUri.getConfigId(), &distributor); - configFetcher.subscribeGenerationChanges(&distributor); - configFetcher.start(); - - while (! distributor.isConfigComplete() ) { - std::this_thread::sleep_for(10ms); - } - distributor.run(_configUri); - - EV_STOPPING(programName, "Clean exit"); - return 0; - } catch (const FileDoesNotExistException & e) { - EV_STOPPING(programName, e.what()); - return 1; - } catch (const ZKNodeDoesNotExistsException & e) { - EV_STOPPING(programName, e.what()); - return 2; - } catch (const ZKSessionExpired & e) { - EV_STOPPING(programName, e.what()); - return 3; - } catch (const config::ConfigTimeoutException & e) { - EV_STOPPING(programName, e.what()); - return 4; - } catch (const vespalib::PortListenException & e) { - EV_STOPPING(programName, e.what()); - return 5; - } catch (const ZKConnectionLossException & e) { - EV_STOPPING(programName, e.what()); - return 6; - } catch (const ZKFailedConnecting & e) { - EV_STOPPING(programName, e.what()); - return 7; - } catch (const config::InvalidConfigException & e) { - EV_STOPPING(programName, e.what()); - return 8; - } catch (const ZKOperationTimeoutException & e) { - EV_STOPPING(programName, e.what()); - return 9; - } catch (const ZKGenericException & e) { - EV_STOPPING(programName, e.what()); - return 99; - } -} - -int -executeApplication(int argc, char** argv) { - const char - *configId("configid"), - *help("help"); - - namespace po = boost::program_options; - po::options_description description; - description.add_options() - (configId, po::value<std::string > (), "id to request config for") - (help, "help"); - - try { - po::variables_map values; - po::store(po::parse_command_line(argc, argv, description), values); - - if (exists(help, values)) { - std::cout <<description; - return 0; - } - ensureExists(configId, values); - - FileDistributorApplication application(values[configId].as<std::string > ()); - return application.Entry(argc, argv); - - } catch(ProgramOptionException& e) { - std::cerr <<e._msg <<std::endl; - return -1; - } -} - -namespace { - -class InitSignals { -public: - InitSignals() { initSignals(); } -}; - -InitSignals _G_initSignals __attribute__ ((init_priority (101))); - -} - -int -main(int argc, char** argv) { - if (askedToShutDown()) { return 0; } - EV_STARTED(programName); - - std::srand(std::time(0)); - filedistribution::ZKLogging loggingGuard; - - return executeApplication(argc, argv); -} diff --git a/filedistribution/src/apps/status/.gitignore b/filedistribution/src/apps/status/.gitignore deleted file mode 100644 index 2105a3c7051..00000000000 --- a/filedistribution/src/apps/status/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -/vespa-status-filedistribution-bin -filedistribution_status-filedistribution_app diff --git a/filedistribution/src/apps/status/CMakeLists.txt b/filedistribution/src/apps/status/CMakeLists.txt deleted file mode 100644 index e666bc2cecc..00000000000 --- a/filedistribution/src/apps/status/CMakeLists.txt +++ /dev/null @@ -1,16 +0,0 @@ -# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -vespa_add_executable(filedistribution_status-filedistribution_app - SOURCES - status-filedistribution.cpp - INSTALL bin - OUTPUT_NAME vespa-status-filedistribution-bin - DEPENDS - filedistribution_filedistributionmodel - filedistribution_common -) -target_compile_options(filedistribution_status-filedistribution_app PRIVATE -DTORRENT_DISABLE_ENCRYPTION -DTORRENT_DISABLE_DHT -DWITH_SHIPPED_GEOIP_H -DBOOST_ASIO_HASH_MAP_BUCKETS=1021 -DBOOST_EXCEPTION_DISABLE -DBOOST_ASIO_ENABLE_CANCELIO -DBOOST_ASIO_DYN_LINK -DTORRENT_LINKING_SHARED) -vespa_add_target_system_dependency(filedistribution_status-filedistribution_app boost boost_system${VESPA_BOOST_LIB_SUFFIX}) -vespa_add_target_system_dependency(filedistribution_status-filedistribution_app boost boost_thread${VESPA_BOOST_LIB_SUFFIX}) -vespa_add_target_system_dependency(filedistribution_status-filedistribution_app boost boost_program_options${VESPA_BOOST_LIB_SUFFIX}) -vespa_add_target_system_dependency(filedistribution_status-filedistribution_app boost boost_filesystem${VESPA_BOOST_LIB_SUFFIX}) -vespa_install_script(vespa-status-filedistribution.sh vespa-status-filedistribution bin) diff --git a/filedistribution/src/apps/status/status-filedistribution.cpp b/filedistribution/src/apps/status/status-filedistribution.cpp deleted file mode 100644 index 52cabdfcb2d..00000000000 --- a/filedistribution/src/apps/status/status-filedistribution.cpp +++ /dev/null @@ -1,180 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include <vespa/filedistribution/model/zkfacade.h> -#include <vespa/filedistribution/model/filedistributionmodel.h> -#include <vespa/filedistribution/model/filedistributionmodelimpl.h> -#include <zookeeper/zookeeper.h> -#include <boost/program_options.hpp> - -#include <vespa/log/log.h> -#include <iostream> -#include <map> -#include <thread> - -LOG_SETUP("status-filedistribution"); - -using namespace filedistribution; -using namespace std::literals; -namespace po = boost::program_options; - -std::string -plural(size_t size) -{ - if (size == 1) - return ""; - else - return "s"; -} - -template <class CONT> -std::string -plural(const CONT& cont) -{ - size_t size = cont.size(); - return plural(size); -} - -typedef FileDBModel::HostStatus HostStatus; -typedef std::map<std::string, HostStatus> StatusByHostName; - -void -printWaitingForHosts(const StatusByHostName& notFinishedHosts) -{ - std::cout <<"Waiting for the following host" <<plural(notFinishedHosts) <<":" <<std::endl; - for (const StatusByHostName::value_type & hostNameAndStatus : notFinishedHosts) { - std::cout <<hostNameAndStatus.first <<" ("; - - const HostStatus& hostStatus = hostNameAndStatus.second; - if (hostStatus._state == HostStatus::notStarted) { - std::cout <<"Not started"; - } else { - std::cout <<"Downloading, " - <<hostStatus._numFilesFinished <<"/" <<hostStatus._numFilesToDownload - <<" file" <<plural(hostStatus._numFilesToDownload) <<" completed"; - } - std::cout <<")" <<std::endl; - } -} - -//TODO:refactor -int printStatus(const std::string& zkservers) -{ - std::shared_ptr<ZKFacade> zk(new ZKFacade(zkservers, true)); - - std::shared_ptr<FileDBModel> model(new ZKFileDBModel(zk)); - - std::vector<std::string> hosts = model->getHosts(); - - StatusByHostName notFinishedHosts; - StatusByHostName finishedHosts; - bool hasStarted = false; - - for (const std::string & host : hosts) { - HostStatus hostStatus = model->getHostStatus(host); - switch (hostStatus._state) { - case HostStatus::finished: - hasStarted = true; - finishedHosts[host] = hostStatus; - break; - case HostStatus::inProgress: - hasStarted = true; - //@fallthrough@ - case HostStatus::notStarted: - notFinishedHosts[host] = hostStatus; - break; - } - } - - if (notFinishedHosts.empty()) { - std::cout <<"Finished distributing files to all hosts." <<std::endl; - return 0; - } else if (!hasStarted) { - std::cout <<"File distribution has not yet started." <<std::endl; - return 0; - } else { - printWaitingForHosts(notFinishedHosts); - return 5; - } -} - -int -printStatusRetryIfZKProblem(const std::string& zkservers, const std::string& zkLogFile) -{ - FILE* file = fopen(zkLogFile.c_str(), "w"); - if (file == NULL) { - std::cerr <<"Could not open file " <<zkLogFile <<std::endl; - } else { - zoo_set_log_stream(file); - } - zoo_set_debug_level(ZOO_LOG_LEVEL_ERROR); - - for (int i = 0; i < 5; ++i) { - try { - return printStatus(zkservers); - } catch (ZKNodeDoesNotExistsException& e) { - LOG(debug, "Node does not exists, assuming concurrent update. %s", e.what()); - - } catch (ZKSessionExpired& e) { - LOG(debug, "Session expired."); - } - std::this_thread::sleep_for(500ms); - } - return 4; -} - - -//TODO: move to common -struct ProgramOptionException { - std::string _msg; - ProgramOptionException(const std::string & msg) - : _msg(msg) - {} -}; - -bool exists(const std::string& optionName, const po::variables_map& map) { - return map.find(optionName) != map.end(); -} - -void ensureExists(const std::string& optionName, const po::variables_map& map) { - if (!exists(optionName, map)) { - throw ProgramOptionException("Error: Missing option " + optionName); - } -} -//END: move to common - - -int -main(int argc, char** argv) { - const char - *zkstring = "zkstring", - *zkLogFile = "zkLogFile", - *help = "help"; - - po::options_description description; - description.add_options() - (zkstring, po::value<std::string > (), "The zookeeper servers to connect to, separated by comma") - (zkLogFile, po::value<std::string >() -> default_value("/dev/null"), "Zookeeper log file") - (help, "help"); - - try { - po::variables_map values; - po::store(po::parse_command_line(argc, argv, description), values); - - if (exists(help, values)) { - std::cout <<description; - return 0; - } - - ensureExists(zkstring, values); - - return printStatusRetryIfZKProblem( - values[zkstring] .as<std::string>(), - values[zkLogFile].as<std::string>()); - } catch (const ProgramOptionException& e) { - std::cerr <<e._msg <<std::endl; - return 3; - } catch(const std::exception& e) { - std::cerr <<"Error: " <<e.what() <<std::endl; - return 3; - } -} diff --git a/filedistribution/src/apps/status/vespa-status-filedistribution.sh b/filedistribution/src/main/sh/vespa-status-filedistribution.sh index 65a6da89b56..119999c9191 100644 --- a/filedistribution/src/apps/status/vespa-status-filedistribution.sh +++ b/filedistribution/src/main/sh/vespa-status-filedistribution.sh @@ -60,22 +60,15 @@ findroot ROOT=${VESPA_HOME%/} -if [ "$cloudconfig_server__disable_filedistributor" = "" ] || [ "$cloudconfig_server__disable_filedistributor" != "true" ]; then - ZKSTRING=$($ROOT/libexec/vespa/vespa-config.pl -zkstring) - test -z "$VESPA_LOG_LEVEL" && VESPA_LOG_LEVEL=warning - export VESPA_LOG_LEVEL - exec $ROOT/bin/vespa-status-filedistribution-bin --zkstring "$ZKSTRING" $@ -else - if [ "$cloudconfig_server__environment" != "" ]; then - environment="--environment $cloudconfig_server__environment" - fi - if [ "$cloudconfig_server__region" != "" ]; then - region="--region $cloudconfig_server__region" - fi +if [ "$cloudconfig_server__environment" != "" ]; then + environment="--environment $cloudconfig_server__environment" +fi +if [ "$cloudconfig_server__region" != "" ]; then + region="--region $cloudconfig_server__region" +fi - defaults="--tenant default --application default --instance default" - jvmoptions="-XX:MaxJavaStackTraceDepth=-1 $(getJavaOptionsIPV46) -Xms48m -Xmx48m" - jar="-cp $VESPA_HOME/lib/jars/filedistribution-jar-with-dependencies.jar" +defaults="--tenant default --application default --instance default" +jvmoptions="-XX:MaxJavaStackTraceDepth=-1 $(getJavaOptionsIPV46) -Xms48m -Xmx48m" +jar="-cp $VESPA_HOME/lib/jars/filedistribution-jar-with-dependencies.jar" - exec java $jvmoptions $jar com.yahoo.vespa.filedistribution.status.FileDistributionStatusClient $defaults $environment $region "$@" -fi +exec java $jvmoptions $jar com.yahoo.vespa.filedistribution.status.FileDistributionStatusClient $defaults $environment $region "$@" diff --git a/filedistribution/src/tests/.gitignore b/filedistribution/src/tests/.gitignore deleted file mode 100644 index ee50e13466c..00000000000 --- a/filedistribution/src/tests/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -Makefile -*_test diff --git a/filedistribution/src/tests/common/.gitignore b/filedistribution/src/tests/common/.gitignore deleted file mode 100644 index 060721ea295..00000000000 --- a/filedistribution/src/tests/common/.gitignore +++ /dev/null @@ -1 +0,0 @@ -filedistribution_common_test_app diff --git a/filedistribution/src/tests/common/CMakeLists.txt b/filedistribution/src/tests/common/CMakeLists.txt deleted file mode 100644 index 9f142eec9e7..00000000000 --- a/filedistribution/src/tests/common/CMakeLists.txt +++ /dev/null @@ -1,13 +0,0 @@ -# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -vespa_add_executable(filedistribution_common_test_app TEST - SOURCES - testCommon.cpp - DEPENDS -) -target_compile_options(filedistribution_common_test_app PRIVATE -DTORRENT_DISABLE_ENCRYPTION -DTORRENT_DISABLE_DHT -DWITH_SHIPPED_GEOIP_H -DBOOST_ASIO_HASH_MAP_BUCKETS=1021 -DBOOST_EXCEPTION_DISABLE -DBOOST_ASIO_ENABLE_CANCELIO -DBOOST_ASIO_DYN_LINK -DTORRENT_LINKING_SHARED) -vespa_add_target_system_dependency(filedistribution_common_test_app boost boost_system${VESPA_BOOST_LIB_SUFFIX}) -vespa_add_target_system_dependency(filedistribution_common_test_app boost boost_thread${VESPA_BOOST_LIB_SUFFIX}) -vespa_add_target_system_dependency(filedistribution_common_test_app boost boost_program_options${VESPA_BOOST_LIB_SUFFIX}) -vespa_add_target_system_dependency(filedistribution_common_test_app boost boost_filesystem${VESPA_BOOST_LIB_SUFFIX}) -vespa_add_target_system_dependency(filedistribution_common_test_app boost boost_unit_test_framework${VESPA_BOOST_LIB_SUFFIX}) -vespa_add_test(NAME filedistribution_common_test_app NO_VALGRIND COMMAND filedistribution_common_test_app) diff --git a/filedistribution/src/tests/common/testCommon.cpp b/filedistribution/src/tests/common/testCommon.cpp deleted file mode 100644 index 8ad17b6b7ce..00000000000 --- a/filedistribution/src/tests/common/testCommon.cpp +++ /dev/null @@ -1,41 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#define BOOST_TEST_DYN_LINK -#define BOOST_TEST_MAIN - -#include <vespa/filedistribution/common/buffer.h> - -#include <boost/test/unit_test.hpp> -#include <string> - -namespace fd = filedistribution; - -const size_t bufferCapacity = 10; - -fd::Buffer -getBuffer() { - const char* test = "test"; - fd::Buffer buffer(test, test + strlen(test)); - buffer.reserve(bufferCapacity); - buffer.push_back(0); - return buffer; -} - -BOOST_AUTO_TEST_CASE(bufferTest) { - fd::Buffer buffer(getBuffer()); - BOOST_CHECK(buffer.begin() != 0); - BOOST_CHECK_EQUAL(bufferCapacity, buffer.capacity()); - BOOST_CHECK_EQUAL(5u, buffer.size()); - BOOST_CHECK_EQUAL(std::string("test"), buffer.begin()); -} - -struct Callback { - bool* _called; - Callback(bool *called) - :_called(called) - {} - - void operator()(const std::string& str) { - BOOST_CHECK_EQUAL("abcd", str); - *_called = true; - } -}; diff --git a/filedistribution/src/tests/filedbmodelimpl/.gitignore b/filedistribution/src/tests/filedbmodelimpl/.gitignore deleted file mode 100644 index 05a25df1258..00000000000 --- a/filedistribution/src/tests/filedbmodelimpl/.gitignore +++ /dev/null @@ -1 +0,0 @@ -filedistribution_filedbmodelimpl_test_app diff --git a/filedistribution/src/tests/filedbmodelimpl/CMakeLists.txt b/filedistribution/src/tests/filedbmodelimpl/CMakeLists.txt deleted file mode 100644 index a97cc9e4c7a..00000000000 --- a/filedistribution/src/tests/filedbmodelimpl/CMakeLists.txt +++ /dev/null @@ -1,16 +0,0 @@ -# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -vespa_add_executable(filedistribution_filedbmodelimpl_test_app TEST - SOURCES - test-filedistributionmodelimpl.cpp - DEPENDS - filedistribution_filedistributionmodel - filedistribution_common - filedistribution_mocks -) -target_compile_options(filedistribution_filedbmodelimpl_test_app PRIVATE -DTORRENT_DISABLE_ENCRYPTION -DTORRENT_DISABLE_DHT -DWITH_SHIPPED_GEOIP_H -DBOOST_ASIO_HASH_MAP_BUCKETS=1021 -DBOOST_EXCEPTION_DISABLE -DBOOST_ASIO_ENABLE_CANCELIO -DBOOST_ASIO_DYN_LINK -DTORRENT_LINKING_SHARED) -vespa_add_target_system_dependency(filedistribution_filedbmodelimpl_test_app boost boost_system${VESPA_BOOST_LIB_SUFFIX}) -vespa_add_target_system_dependency(filedistribution_filedbmodelimpl_test_app boost boost_thread${VESPA_BOOST_LIB_SUFFIX}) -vespa_add_target_system_dependency(filedistribution_filedbmodelimpl_test_app boost boost_program_options${VESPA_BOOST_LIB_SUFFIX}) -vespa_add_target_system_dependency(filedistribution_filedbmodelimpl_test_app boost boost_filesystem${VESPA_BOOST_LIB_SUFFIX}) -vespa_add_target_system_dependency(filedistribution_filedbmodelimpl_test_app boost boost_unit_test_framework${VESPA_BOOST_LIB_SUFFIX}) -vespa_add_test(NAME filedistribution_filedbmodelimpl_test_app NO_VALGRIND COMMAND filedistribution_filedbmodelimpl_test_app) diff --git a/filedistribution/src/tests/filedbmodelimpl/test-filedistributionmodelimpl.cpp b/filedistribution/src/tests/filedbmodelimpl/test-filedistributionmodelimpl.cpp deleted file mode 100644 index 1de7fc817ae..00000000000 --- a/filedistribution/src/tests/filedbmodelimpl/test-filedistributionmodelimpl.cpp +++ /dev/null @@ -1,51 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#define BOOST_TEST_DYN_LINK -#define BOOST_TEST_MAIN -#define BOOST_TEST_MODULE filedbmodelimpl test -#include <boost/test/unit_test.hpp> - -#include <iostream> -#include <vector> -#include <vespa/filedistribution/common/componentsdeleter.h> -#include <vespa/filedistribution/model/filedistributionmodelimpl.h> -#include <vespa/filedistribution/model/zkfacade.h> -#include <zookeeper/zookeeper.h> - - -using namespace filedistribution; - - -namespace { - - -struct Fixture { - ComponentsDeleter _componentsDeleter; - std::shared_ptr<ZKFacade> _zk; - std::shared_ptr<FileDistributionModelImpl> _distModel; - Fixture() { - _zk = _componentsDeleter.track(new ZKFacade("test1-tonyv:2181", false)); - _distModel.reset(new FileDistributionModelImpl("hostname", 12345, _zk)); - } - ~Fixture() { } -}; - -} //anonymous namespace - - -BOOST_FIXTURE_TEST_SUITE(FileDistributionModelImplTests, Fixture) - -BOOST_AUTO_TEST_CASE(configServersAsPeers) -{ - std::vector<std::string> peers; - peers.push_back("old"); - peers.push_back("config:123"); - peers.push_back("config:567"); - peers.push_back("foo:123"); - _distModel->addConfigServersAsPeers(peers, "config,configTwo", 123); - BOOST_CHECK(peers.size() == 5); - BOOST_CHECK(peers[4] == "configTwo:123"); - _distModel->addConfigServersAsPeers(peers, NULL, 123); - BOOST_CHECK(peers.size() == 5); -} - -BOOST_AUTO_TEST_SUITE_END() diff --git a/filedistribution/src/tests/filedownloader/.gitignore b/filedistribution/src/tests/filedownloader/.gitignore deleted file mode 100644 index f7bb72b4791..00000000000 --- a/filedistribution/src/tests/filedownloader/.gitignore +++ /dev/null @@ -1 +0,0 @@ -filedistribution_filedownloader_test_app diff --git a/filedistribution/src/tests/filedownloader/CMakeLists.txt b/filedistribution/src/tests/filedownloader/CMakeLists.txt deleted file mode 100644 index 7bdfeda3525..00000000000 --- a/filedistribution/src/tests/filedownloader/CMakeLists.txt +++ /dev/null @@ -1,15 +0,0 @@ -# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -vespa_add_executable(filedistribution_filedownloader_test_app TEST - SOURCES - testfiledownloader.cpp - DEPENDS - filedistribution_filedistributionmodel - filedistribution_common -) -target_compile_options(filedistribution_filedownloader_test_app PRIVATE -DTORRENT_DISABLE_ENCRYPTION -DTORRENT_DISABLE_DHT -DWITH_SHIPPED_GEOIP_H -DBOOST_ASIO_HASH_MAP_BUCKETS=1021 -DBOOST_EXCEPTION_DISABLE -DBOOST_ASIO_ENABLE_CANCELIO -DBOOST_ASIO_DYN_LINK -DTORRENT_LINKING_SHARED) -vespa_add_target_system_dependency(filedistribution_filedownloader_test_app boost boost_system${VESPA_BOOST_LIB_SUFFIX}) -vespa_add_target_system_dependency(filedistribution_filedownloader_test_app boost boost_thread${VESPA_BOOST_LIB_SUFFIX}) -vespa_add_target_system_dependency(filedistribution_filedownloader_test_app boost boost_program_options${VESPA_BOOST_LIB_SUFFIX}) -vespa_add_target_system_dependency(filedistribution_filedownloader_test_app boost boost_filesystem${VESPA_BOOST_LIB_SUFFIX}) -vespa_add_target_system_dependency(filedistribution_filedownloader_test_app boost boost_unit_test_framework${VESPA_BOOST_LIB_SUFFIX}) -vespa_add_test(NAME filedistribution_filedownloader_test_app NO_VALGRIND COMMAND filedistribution_filedownloader_test_app) diff --git a/filedistribution/src/tests/filedownloader/testfiledownloader.cpp b/filedistribution/src/tests/filedownloader/testfiledownloader.cpp deleted file mode 100644 index c03c6ca5e6c..00000000000 --- a/filedistribution/src/tests/filedownloader/testfiledownloader.cpp +++ /dev/null @@ -1,143 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#define BOOST_TEST_DYN_LINK -#define BOOST_TEST_MAIN - -#include <vespa/filedistribution/distributor/filedownloader.h> -#include <vespa/filedistribution/distributor/filedistributortrackerimpl.h> - -#include <fstream> - -#include <boost/test/unit_test.hpp> -#include <boost/filesystem.hpp> -#include <boost/filesystem/fstream.hpp> - -#include <vespa/filedistribution/manager/createtorrent.h> -#include <vespa/filedistribution/common/componentsdeleter.h> - -namespace fs = boost::filesystem; - -using namespace filedistribution; - -namespace { -const std::string localHost("localhost"); -const int uploaderPort = 9113; -const int downloaderPort = 9112; - -#if 0 -std::shared_ptr<FileDownloader> -createDownloader(ComponentsDeleter& deleter, - int port, const fs::path& downloaderPath, - const std::shared_ptr<FileDistributionModel>& model) -{ - std::shared_ptr<FileDistributorTrackerImpl> tracker(deleter.track(new FileDistributorTrackerImpl(model))); - std::shared_ptr<FileDownloader> downloader(deleter.track(new FileDownloader(tracker, - localHost, port, downloaderPath))); - - tracker->setDownloader(downloader); - return downloader; -} -#endif - -} //anonymous namespace - -class MockFileDistributionModel : public FileDistributionModel { - FileDBModel& getFileDBModel() override { - abort(); - } - - std::set<std::string> getFilesToDownload() override { - return std::set<std::string>(); - } - - PeerEntries getPeers(const std::string& , size_t) override { - PeerEntries peers(2); - peers[0].ip = localHost; - peers[0].port = uploaderPort; - - peers[1].ip = localHost; - peers[1].port = downloaderPort; - - return peers; - } - - void addPeer(const std::string&) override {} - void removePeer(const std::string&) override {} - void peerFinished(const std::string&) override {} -}; - - -#if 0 -BOOST_AUTO_TEST_CASE(fileDownloaderTest) { - fs::path testPath = "/tmp/filedownloadertest"; - fs::remove_all(testPath); - - fs::path downloaderPath = testPath / "downloader"; - fs::path uploaderPath = testPath / "uploader"; - - const std::string fileReference = "0123456789012345678901234567890123456789"; - const std::string fileToSend = "filetosend.txt"; - - fs::create_directories(downloaderPath); - fs::create_directories(uploaderPath / fileReference); - - fs::path fileToUploadPath = uploaderPath / fileReference / "filetosend.txt"; - - { - fs::ofstream stream(fileToUploadPath); - stream <<"Hello, world!" <<std::endl; - } - - CreateTorrent createTorrent(fileToUploadPath); - Buffer buffer(createTorrent.bencode()); - - ComponentsDeleter deleter; - - std::shared_ptr<FileDistributionModel> model(deleter.track(new MockFileDistributionModel())); - std::shared_ptr<FileDownloader> downloader = - createDownloader(deleter, downloaderPort, downloaderPath, model); - - std::shared_ptr<FileDownloader> uploader = - createDownloader(deleter, uploaderPort, uploaderPath, model); - - std::thread uploaderThread( [uploader] () { uploader->runEventLoop(); }); - std::thread downloaderThread( [downloader] () { downloader->runEventLoop(); }); - - uploader->addTorrent(fileReference, buffer); - downloader->addTorrent(fileReference, buffer); - - sleep(5); - BOOST_CHECK(fs::exists(downloaderPath / fileReference / fileToSend)); - - uploaderThread.interrupt(); - uploaderThread.join(); - - downloaderThread.interrupt(); - downloaderThread.join(); - - fs::remove_all(testPath); -} -#endif - -//TODO: cleanup -libtorrent::sha1_hash -toInfoHash(const std::string& fileReference) { - assert (fileReference.size() == 40); - std::istringstream s(fileReference); - - libtorrent::sha1_hash infoHash; - s >> infoHash; - return infoHash; -} - -BOOST_AUTO_TEST_CASE(test_filereference_infohash_conversion) { - const std::string fileReference = "3a281c905c9b6ebe4d969037a198454fedefbdf3"; - - libtorrent::sha1_hash infoHash = toInfoHash(fileReference); - - std::ostringstream fileReferenceString; - fileReferenceString <<infoHash; - - BOOST_CHECK(fileReference == fileReferenceString.str()); - - std::cout <<fileReference <<std::endl <<fileReferenceString.str() <<std::endl; -} diff --git a/filedistribution/src/tests/lib/CMakeLists.txt b/filedistribution/src/tests/lib/CMakeLists.txt deleted file mode 100644 index 735a735e1ba..00000000000 --- a/filedistribution/src/tests/lib/CMakeLists.txt +++ /dev/null @@ -1,6 +0,0 @@ -# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -vespa_add_library(filedistribution_mocks STATIC - SOURCES - mock-zookeeper.cpp - DEPENDS -) diff --git a/filedistribution/src/tests/lib/mock-zookeeper.cpp b/filedistribution/src/tests/lib/mock-zookeeper.cpp deleted file mode 100644 index 5416afdc1fe..00000000000 --- a/filedistribution/src/tests/lib/mock-zookeeper.cpp +++ /dev/null @@ -1,327 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <zookeeper/zookeeper.h> - -#include <string> -#include <map> -#include <cassert> -#include <cstring> -#include <vector> - -#include <thread> -#include <atomic> -#include <boost/lexical_cast.hpp> - -#include <iostream> - -#include <vespa/filedistribution/common/concurrentqueue.h> - -using std::map; -using std::string; -using std::vector; -using std::pair; -using std::make_pair; -using filedistribution::ConcurrentQueue; - -namespace { -std::pair<string, string> parentPathAndChildName(const string& childPath) -{ - if (childPath.empty()) { - return std::make_pair("", ""); - } else { - assert (childPath[0] == '/'); - - size_t index = childPath.find_last_of("/"); - return std::make_pair(childPath.substr(0, index), childPath.substr(index + 1)); - } -} - -struct Node { - typedef map<string, Node> Children; - Children children; - bool exists; - bool ephemeral; - vector<char> buffer; - vector<pair<watcher_fn, void*> > watchers; - - Node() - :exists(false), - ephemeral(false) - {} - - void addWatcher(watcher_fn fn, void* context) { - if (fn) - watchers.push_back(make_pair(fn, context)); - } - - void triggerWatches(zhandle_t* zh, const std::string& path); -}; - -std::shared_ptr<Node> sharedRoot; - -void doNothing() { } - -struct ZHandle { - struct Worker { - ZHandle& zhandle; - - Worker(ZHandle* parent) : zhandle(*parent) {} - - void operator()(); - }; - - int sequence; - - std::shared_ptr<Node> root; - std::atomic<bool> _closed; - std::thread _watchersThread; - vector<string> ephemeralNodes; - - typedef std::function<void (void)> InvokeWatcherFun; - ConcurrentQueue<InvokeWatcherFun> watcherInvocations; - - Node& getNode(const string& path); - - Node& getParent(const string& path); - - void ephemeralNode(const string&path) { - ephemeralNodes.push_back(path); - } - - ZHandle() : sequence(0), _closed(false), _watchersThread(Worker(this)) { - if (!sharedRoot) - sharedRoot.reset(new Node()); - - root = sharedRoot; - } - - ~ZHandle() { - std::for_each(ephemeralNodes.begin(), ephemeralNodes.end(), - [this] (const string & s) { zoo_delete((zhandle_t*)this, s.c_str(), 0); }); - close(); - _watchersThread.join(); - } - void close() { - _closed.store(true); - watcherInvocations.push(std::ref(doNothing)); - } -}; - -void -ZHandle::Worker::operator()() -{ - while (! zhandle._closed.load()) { - InvokeWatcherFun fun = zhandle.watcherInvocations.pop(); - fun(); - } -} - -Node& ZHandle::getNode(const string& path) { - auto splittedPath = parentPathAndChildName(path); - if (splittedPath.second.empty()) { - return *root; - } else { - return getNode(splittedPath.first).children[splittedPath.second]; - } -} - -Node& -ZHandle::getParent(const string& childPath) -{ - auto splittedPath = parentPathAndChildName(childPath); - if (splittedPath.second.empty()) { - throw "Can't get parent of root."; - } else { - return getNode(splittedPath.first); - } -} - -void -Node::triggerWatches(zhandle_t* zh, const std::string& path) { - for (auto i = watchers.begin(); i != watchers.end(); ++i) { - ((ZHandle*)zh)->watcherInvocations.push([zh, i, path] () { i->first(zh, 0, 0, path.c_str(), i->second); }); - } - watchers.clear(); -} - -} //anonymous namespace - -extern "C" { - -ZOOAPI void zoo_set_debug_level(ZooLogLevel) {} -ZOOAPI zhandle_t *zookeeper_init(const char * host, watcher_fn fn, - int recv_timeout, const clientid_t *clientid, void *context, int flags) -{ - (void)host; - (void)fn; - (void)recv_timeout; - (void)clientid; - (void)context; - (void)flags; - - return (zhandle_t*)new ZHandle; -} - -ZOOAPI int zookeeper_close(zhandle_t *zh) -{ - delete (ZHandle*)zh; - return 0; -} - -ZOOAPI int zoo_create(zhandle_t *zh, const char *pathOrPrefix, const char *value, - int valuelen, const struct ACL_vector *, int flags, - char *path_buffer, int path_buffer_len) -{ - std::string path = pathOrPrefix; - if (flags & ZOO_SEQUENCE) - path += boost::lexical_cast<std::string>(((ZHandle*)zh)->sequence++); - - strncpy(path_buffer, path.c_str(), path_buffer_len); - Node& node = ((ZHandle*)zh)->getNode(path); - node.exists = true; - - if (flags & ZOO_EPHEMERAL) - ((ZHandle*)zh)->ephemeralNode(path); - - node.buffer.resize(valuelen); - std::copy(value, value + valuelen, node.buffer.begin()); - - - node.triggerWatches(zh, path); - ((ZHandle*)zh)->getParent(path).triggerWatches(zh, - parentPathAndChildName(path).first); - - return 0; -} - - -ZOOAPI int zoo_set(zhandle_t *zh, const char *path, const char *buffer, - int buflen, int version) { - (void)version; - - Node& node = ((ZHandle*)zh)->getNode(path); - if (!node.exists) - return ZNONODE; - - - node.buffer.resize(buflen); - std::copy(buffer, buffer + buflen, node.buffer.begin()); - - node.triggerWatches(zh, path); - return 0; -} - - - -ZOOAPI int zoo_get_children(zhandle_t *zh, const char *path, int watch, - struct String_vector *strings) -{ - (void)watch; - return zoo_wget_children(zh, path, - 0, 0, - strings); -} - -ZOOAPI int zoo_wget_children(zhandle_t *zh, const char *path, - watcher_fn watcher, void* watcherCtx, - struct String_vector *strings) -{ - Node& node = ((ZHandle*)zh)->getNode(path); - strings->count = node.children.size(); - strings->data = new char*[strings->count]; - - int index = 0; - for (auto i = node.children.begin(); i != node.children.end(); ++i) { - strings->data[index] = new char[i->first.length() + 1]; - std::strcpy(strings->data[index], &*i->first.begin()); - ++index; - } - - node.addWatcher(watcher, watcherCtx); - - return 0; -} - - - - -ZOOAPI int zoo_delete(zhandle_t *zh, const char *path, int version) -{ - (void)version; - - std::string pathStr = path; - int index = pathStr.find_last_of("/"); - - if (pathStr.length() == 1) - throw "Can't delete root"; - - Node& parent = ((ZHandle*)zh)->getNode(pathStr.substr(0, index)); - parent.children.erase(pathStr.substr(index + 1)); - - ((ZHandle*)zh)->getParent(path).triggerWatches(zh, - parentPathAndChildName(path).first); - - return 0; -} - -void zoo_set_log_stream(FILE*) {} - -int deallocate_String_vector(struct String_vector *v) { - for (int i=0; i< v->count; ++i) { - delete[] v->data[i]; - } - delete[] v->data; - return 0; -} - - -ZOOAPI int zoo_get(zhandle_t *zh, const char *path, int watch, char *buffer, - int* buffer_len, struct Stat *stat) -{ - (void)watch; - - return zoo_wget(zh, path, - 0, 0, - buffer, buffer_len, stat); - -} - -ZOOAPI int zoo_wget(zhandle_t *zh, const char *path, - watcher_fn watcher, void* watcherCtx, - char *buffer, int* buffer_len, struct Stat *) -{ - Node& node = ((ZHandle*)zh)->getNode(path); - std::copy(node.buffer.begin(), node.buffer.end(), buffer); - *buffer_len = node.buffer.size(); - - node.addWatcher(watcher, watcherCtx); - return 0; -} - -ZOOAPI int zoo_wexists(zhandle_t *zh, const char *path, - watcher_fn watcher, void* watcherCtx, struct Stat *) -{ - Node& node = ((ZHandle*)zh)->getNode(path); - - node.addWatcher(watcher, watcherCtx); - return node.exists ? ZOK : ZNONODE; -} - -ZOOAPI int zoo_exists(zhandle_t *zh, const char *path, int watch, struct Stat *stat) -{ - (void)watch; - return zoo_wexists(zh, path, - 0, 0, - stat); -} - - - - -ZOOAPI ACL_vector ZOO_OPEN_ACL_UNSAFE; - -ZOOAPI const int ZOO_SEQUENCE = 1; -ZOOAPI const int ZOO_EPHEMERAL = 2; -ZOOAPI const int ZOO_SESSION_EVENT = 3; -ZOOAPI const int ZOO_EXPIRED_SESSION_STATE = 4; -ZOOAPI const int ZOO_AUTH_FAILED_STATE = 5; -} diff --git a/filedistribution/src/tests/rpc/.gitignore b/filedistribution/src/tests/rpc/.gitignore deleted file mode 100644 index b29a47efd87..00000000000 --- a/filedistribution/src/tests/rpc/.gitignore +++ /dev/null @@ -1 +0,0 @@ -filedistribution_rpc_test_app diff --git a/filedistribution/src/tests/rpc/CMakeLists.txt b/filedistribution/src/tests/rpc/CMakeLists.txt deleted file mode 100644 index 1882959f94f..00000000000 --- a/filedistribution/src/tests/rpc/CMakeLists.txt +++ /dev/null @@ -1,15 +0,0 @@ -# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -vespa_add_executable(filedistribution_rpc_test_app TEST - SOURCES - testfileprovider.cpp - DEPENDS - filedistribution_filedistributorrpc - filedistribution_common -) -target_compile_options(filedistribution_rpc_test_app PRIVATE -DTORRENT_DISABLE_ENCRYPTION -DTORRENT_DISABLE_DHT -DWITH_SHIPPED_GEOIP_H -DBOOST_ASIO_HASH_MAP_BUCKETS=1021 -DBOOST_EXCEPTION_DISABLE -DBOOST_ASIO_ENABLE_CANCELIO -DBOOST_ASIO_DYN_LINK -DTORRENT_LINKING_SHARED) -vespa_add_target_system_dependency(filedistribution_rpc_test_app boost boost_system${VESPA_BOOST_LIB_SUFFIX}) -vespa_add_target_system_dependency(filedistribution_rpc_test_app boost boost_thread${VESPA_BOOST_LIB_SUFFIX}) -vespa_add_target_system_dependency(filedistribution_rpc_test_app boost boost_program_options${VESPA_BOOST_LIB_SUFFIX}) -vespa_add_target_system_dependency(filedistribution_rpc_test_app boost boost_filesystem${VESPA_BOOST_LIB_SUFFIX}) -vespa_add_target_system_dependency(filedistribution_rpc_test_app boost boost_unit_test_framework${VESPA_BOOST_LIB_SUFFIX}) -vespa_add_test(NAME filedistribution_rpc_test_app NO_VALGRIND COMMAND filedistribution_rpc_test_app) diff --git a/filedistribution/src/tests/rpc/mockfileprovider.h b/filedistribution/src/tests/rpc/mockfileprovider.h deleted file mode 100644 index be0a6100165..00000000000 --- a/filedistribution/src/tests/rpc/mockfileprovider.h +++ /dev/null @@ -1,49 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#pragma once - -#include <vespa/filedistribution/rpc/fileprovider.h> -#include <boost/thread/barrier.hpp> - -namespace filedistribution { - -class MockFileProvider : public FileProvider { - DownloadCompletedSignal _downloadCompleted; - DownloadFailedSignal _downloadFailed; -public: - static const std::string _queueForeverFileReference; - - boost::barrier _queueForeverBarrier; - - boost::optional<Path> getPath(const std::string& fileReference) override { - if (fileReference == "dd") { - return Path("direct/result/path"); - } else { - return boost::optional<Path>(); - } - } - - void downloadFile(const std::string& fileReference) override { - if (fileReference == _queueForeverFileReference) { - _queueForeverBarrier.wait(); - return; - } - - sleep(1); - downloadCompleted()(fileReference, "downloaded/path/" + fileReference); - } - - DownloadCompletedSignal& downloadCompleted() override { - return _downloadCompleted; - } - - DownloadFailedSignal& downloadFailed() override { - return _downloadFailed; - } - - MockFileProvider() - :_queueForeverBarrier(2) - {} -}; - -} //namespace filedistribution - diff --git a/filedistribution/src/tests/rpc/testfileprovider.cpp b/filedistribution/src/tests/rpc/testfileprovider.cpp deleted file mode 100644 index 2879258272a..00000000000 --- a/filedistribution/src/tests/rpc/testfileprovider.cpp +++ /dev/null @@ -1,66 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#define BOOST_TEST_DYN_LINK -#define BOOST_TEST_MAIN - -#include "mockfileprovider.h" -#include <vespa/filedistribution/rpc/filedistributorrpc.h> -#include <vespa/frtstream/frtclientstream.h> -#include <vespa/fnet/frt/rpcrequest.h> -#include <vespa/fnet/frt/target.h> -#include <boost/test/unit_test.hpp> - -namespace fd = filedistribution; - -using fd::MockFileProvider; - -const std::string MockFileProvider::_queueForeverFileReference("queue-forever"); - -BOOST_AUTO_TEST_CASE(fileDistributionRPCTest) { - const std::string spec("tcp/localhost:9111"); - fd::FileProvider::SP provider(new fd::MockFileProvider()); - fd::FileDistributorRPC::SP fileDistributorRPC(new fd::FileDistributorRPC(spec, provider)); - fileDistributorRPC->start(); - - frtstream::FrtClientStream rpc(spec); - frtstream::Method method("waitFor"); - - std::string path; - rpc <<method <<"dd"; - rpc >> path; - BOOST_CHECK_EQUAL("direct/result/path", path); - - rpc <<method <<"0123456789abcdef"; - rpc >> path; - BOOST_CHECK_EQUAL("downloaded/path/0123456789abcdef", path); -} - -//must be run through valgrind -BOOST_AUTO_TEST_CASE(require_that_queued_requests_does_not_leak_memory) { - const std::string spec("tcp/localhost:9111"); - std::shared_ptr<MockFileProvider> provider(new MockFileProvider()); - fd::FileDistributorRPC::SP fileDistributorRPC(new fd::FileDistributorRPC(spec, provider)); - fileDistributorRPC->start(); - - FRT_Supervisor supervisor; - - supervisor.Start(); - FRT_Target *target = supervisor.GetTarget(spec.c_str()); - - FRT_RPCRequest* request = supervisor.AllocRPCRequest(); - request->SetMethodName("waitFor"); - request->GetParams()->AddString(MockFileProvider::_queueForeverFileReference.c_str()); - target->InvokeVoid(request); - - provider->_queueForeverBarrier.wait(); //the request has been enqueued. - fileDistributorRPC.reset(); - - target->SubRef(); - supervisor.ShutDown(true); - -} - -BOOST_AUTO_TEST_CASE(require_that_port_can_be_extracted_from_connection_spec) { - BOOST_CHECK_EQUAL(9056, fd::FileDistributorRPC::get_port("tcp/host:9056")); - BOOST_CHECK_EQUAL(9056, fd::FileDistributorRPC::get_port("tcp/9056")); - BOOST_CHECK_EQUAL(9056, fd::FileDistributorRPC::get_port("9056")); -} diff --git a/filedistribution/src/tests/scheduler/.gitignore b/filedistribution/src/tests/scheduler/.gitignore deleted file mode 100644 index b1976d1c516..00000000000 --- a/filedistribution/src/tests/scheduler/.gitignore +++ /dev/null @@ -1 +0,0 @@ -filedistribution_scheduler_test_app diff --git a/filedistribution/src/tests/scheduler/CMakeLists.txt b/filedistribution/src/tests/scheduler/CMakeLists.txt deleted file mode 100644 index 763d87cfc64..00000000000 --- a/filedistribution/src/tests/scheduler/CMakeLists.txt +++ /dev/null @@ -1,16 +0,0 @@ -# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -vespa_add_executable(filedistribution_scheduler_test_app TEST - SOURCES - test-scheduler.cpp - DEPENDS - filedistribution_distributor - filedistribution_filedistributionmodel - filedistribution_common -) -target_compile_options(filedistribution_scheduler_test_app PRIVATE -DTORRENT_DISABLE_ENCRYPTION -DTORRENT_DISABLE_DHT -DWITH_SHIPPED_GEOIP_H -DBOOST_ASIO_HASH_MAP_BUCKETS=1021 -DBOOST_EXCEPTION_DISABLE -DBOOST_ASIO_ENABLE_CANCELIO -DBOOST_ASIO_DYN_LINK -DTORRENT_LINKING_SHARED) -vespa_add_target_system_dependency(filedistribution_scheduler_test_app boost boost_system${VESPA_BOOST_LIB_SUFFIX}) -vespa_add_target_system_dependency(filedistribution_scheduler_test_app boost boost_thread${VESPA_BOOST_LIB_SUFFIX}) -vespa_add_target_system_dependency(filedistribution_scheduler_test_app boost boost_program_options${VESPA_BOOST_LIB_SUFFIX}) -vespa_add_target_system_dependency(filedistribution_scheduler_test_app boost boost_filesystem${VESPA_BOOST_LIB_SUFFIX}) -vespa_add_target_system_dependency(filedistribution_scheduler_test_app boost boost_unit_test_framework${VESPA_BOOST_LIB_SUFFIX}) -vespa_add_test(NAME filedistribution_scheduler_test_app NO_VALGRIND COMMAND filedistribution_scheduler_test_app) diff --git a/filedistribution/src/tests/scheduler/test-scheduler.cpp b/filedistribution/src/tests/scheduler/test-scheduler.cpp deleted file mode 100644 index d5afe7e4b11..00000000000 --- a/filedistribution/src/tests/scheduler/test-scheduler.cpp +++ /dev/null @@ -1,109 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#define BOOST_TEST_DYN_LINK -#define BOOST_TEST_MAIN - -#include <boost/test/unit_test.hpp> - -#include <vespa/filedistribution/distributor/scheduler.h> - -#include <iostream> - -#include <boost/thread/barrier.hpp> - -using filedistribution::Scheduler; -using namespace std::literals; - -namespace asio = boost::asio; - -class TestException {}; - - -struct CallRun { - volatile bool _caughtException; - - CallRun() - :_caughtException(false) - {} - - void operator()(asio::io_service& ioService) { - try { - //No reset needed after handling exceptions. - ioService.run(); - } catch(const TestException& e ) { - _caughtException = true; - } - } -}; - -struct Fixture { - CallRun callRun; - Scheduler scheduler; - - Fixture() - : scheduler(std::ref(callRun)) - {} -}; - - -BOOST_FIXTURE_TEST_SUITE(SchedulerTest, Fixture) - - -struct RepeatedTask : Scheduler::Task { - void doHandle() override { - std::cout <<"RepeatedTask::doHandle " <<std::endl; - schedule(boost::posix_time::seconds(1)); - } - - RepeatedTask(Scheduler& scheduler) : Task(scheduler) {} -}; - -BOOST_AUTO_TEST_CASE(require_tasks_does_not_keep_scheduler_alive) { - RepeatedTask::SP task(new RepeatedTask(scheduler)); - task->schedule(boost::posix_time::hours(10)); -} - -struct EnsureInvokedTask : Scheduler::Task { - boost::barrier& _barrier; - - void doHandle() override { - _barrier.wait(); - } - - EnsureInvokedTask(Scheduler& scheduler, boost::barrier& barrier) : - Task(scheduler), - _barrier(barrier) - {} -}; - - -BOOST_AUTO_TEST_CASE(require_task_invoked) { - boost::barrier barrier(2); - - EnsureInvokedTask::SP task(new EnsureInvokedTask(scheduler, barrier)); - task->schedule(boost::posix_time::milliseconds(50)); - - barrier.wait(); -} - -struct ThrowExceptionTask : Scheduler::Task { - void doHandle() override { - throw TestException(); - } - - ThrowExceptionTask(Scheduler& scheduler) : - Task(scheduler) - {} -}; - -BOOST_AUTO_TEST_CASE(require_exception_from_tasks_can_be_caught) { - ThrowExceptionTask::SP task(new ThrowExceptionTask(scheduler)); - task->scheduleNow(); - - for (int i=0; i<200 && !callRun._caughtException; ++i) { - std::this_thread::sleep_for(100ms); - } - - BOOST_CHECK(callRun._caughtException); -} - -BOOST_AUTO_TEST_SUITE_END() diff --git a/filedistribution/src/tests/status/.gitignore b/filedistribution/src/tests/status/.gitignore deleted file mode 100644 index 3da528fcd45..00000000000 --- a/filedistribution/src/tests/status/.gitignore +++ /dev/null @@ -1 +0,0 @@ -filedistribution_status_test_app diff --git a/filedistribution/src/tests/status/CMakeLists.txt b/filedistribution/src/tests/status/CMakeLists.txt deleted file mode 100644 index 646cc1351a9..00000000000 --- a/filedistribution/src/tests/status/CMakeLists.txt +++ /dev/null @@ -1,15 +0,0 @@ -# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -vespa_add_executable(filedistribution_status_test_app TEST - SOURCES - test-status.cpp - DEPENDS - filedistribution_filedistributionmodel - filedistribution_common -) -target_compile_options(filedistribution_status_test_app PRIVATE -DTORRENT_DISABLE_ENCRYPTION -DTORRENT_DISABLE_DHT -DWITH_SHIPPED_GEOIP_H -DBOOST_ASIO_HASH_MAP_BUCKETS=1021 -DBOOST_EXCEPTION_DISABLE -DBOOST_ASIO_ENABLE_CANCELIO -DBOOST_ASIO_DYN_LINK -DTORRENT_LINKING_SHARED) -vespa_add_target_system_dependency(filedistribution_status_test_app boost boost_system${VESPA_BOOST_LIB_SUFFIX}) -vespa_add_target_system_dependency(filedistribution_status_test_app boost boost_thread${VESPA_BOOST_LIB_SUFFIX}) -vespa_add_target_system_dependency(filedistribution_status_test_app boost boost_program_options${VESPA_BOOST_LIB_SUFFIX}) -vespa_add_target_system_dependency(filedistribution_status_test_app boost boost_filesystem${VESPA_BOOST_LIB_SUFFIX}) -vespa_add_target_system_dependency(filedistribution_status_test_app boost boost_unit_test_framework${VESPA_BOOST_LIB_SUFFIX}) -vespa_add_test(NAME filedistribution_status_test_app NO_VALGRIND COMMAND filedistribution_status_test_app) diff --git a/filedistribution/src/tests/status/test-status.cpp b/filedistribution/src/tests/status/test-status.cpp deleted file mode 100644 index f50626aabdc..00000000000 --- a/filedistribution/src/tests/status/test-status.cpp +++ /dev/null @@ -1,16 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#define BOOST_TEST_DYN_LINK -#define BOOST_TEST_MAIN -#include <boost/test/unit_test.hpp> - -#include <vespa/filedistribution/model/zkfacade.h> -#include <vespa/filedistribution/model/filedistributionmodel.h> -#include <vespa/filedistribution/model/filedistributionmodelimpl.h> - -using namespace filedistribution; - - -BOOST_AUTO_TEST_CASE(test_retrieve_status) { - // TODO: -} - diff --git a/filedistribution/src/tests/zkfacade/.gitignore b/filedistribution/src/tests/zkfacade/.gitignore deleted file mode 100644 index 6ffef2339b1..00000000000 --- a/filedistribution/src/tests/zkfacade/.gitignore +++ /dev/null @@ -1 +0,0 @@ -filedistribution_zkfacade_test_app diff --git a/filedistribution/src/tests/zkfacade/CMakeLists.txt b/filedistribution/src/tests/zkfacade/CMakeLists.txt deleted file mode 100644 index 18dc8121049..00000000000 --- a/filedistribution/src/tests/zkfacade/CMakeLists.txt +++ /dev/null @@ -1,15 +0,0 @@ -# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -vespa_add_executable(filedistribution_zkfacade_test_app - SOURCES - test-zkfacade.cpp - DEPENDS - filedistribution_filedistributionmodel - filedistribution_common - filedistribution_mocks -) -target_compile_options(filedistribution_zkfacade_test_app PRIVATE -DTORRENT_DISABLE_ENCRYPTION -DTORRENT_DISABLE_DHT -DWITH_SHIPPED_GEOIP_H -DBOOST_ASIO_HASH_MAP_BUCKETS=1021 -DBOOST_EXCEPTION_DISABLE -DBOOST_ASIO_ENABLE_CANCELIO -DBOOST_ASIO_DYN_LINK -DTORRENT_LINKING_SHARED) -vespa_add_target_system_dependency(filedistribution_zkfacade_test_app boost boost_system${VESPA_BOOST_LIB_SUFFIX}) -vespa_add_target_system_dependency(filedistribution_zkfacade_test_app boost boost_thread${VESPA_BOOST_LIB_SUFFIX}) -vespa_add_target_system_dependency(filedistribution_zkfacade_test_app boost boost_program_options${VESPA_BOOST_LIB_SUFFIX}) -vespa_add_target_system_dependency(filedistribution_zkfacade_test_app boost boost_filesystem${VESPA_BOOST_LIB_SUFFIX}) -vespa_add_target_system_dependency(filedistribution_zkfacade_test_app boost boost_unit_test_framework${VESPA_BOOST_LIB_SUFFIX}) diff --git a/filedistribution/src/tests/zkfacade/test-zkfacade.cpp b/filedistribution/src/tests/zkfacade/test-zkfacade.cpp deleted file mode 100644 index de4be087432..00000000000 --- a/filedistribution/src/tests/zkfacade/test-zkfacade.cpp +++ /dev/null @@ -1,228 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#define BOOST_TEST_DYN_LINK -#define BOOST_TEST_MAIN -#define BOOST_TEST_MODULE zkfacade test - -#include <boost/test/unit_test.hpp> -#include <iostream> -#include <boost/thread/barrier.hpp> -#include <vespa/filedistribution/common/componentsdeleter.h> -#include <vespa/filedistribution/model/zkfacade.h> - -#include <zookeeper/zookeeper.h> - - -using namespace std::literals; -using namespace filedistribution; - -namespace { - - -struct Watcher : public ZKFacade::NodeChangedWatcher { - boost::barrier _barrier; - - Watcher() : - _barrier(2) {} - - void operator()() override { - _barrier.wait(); - } -}; - -struct Fixture { - ComponentsDeleter _componentsDeleter; - std::shared_ptr<ZKFacade> zk; - Path testNode; - - Fixture() { - zoo_set_debug_level(ZOO_LOG_LEVEL_WARN); - zk = _componentsDeleter.track(new ZKFacade("test1-tonyv:2181", false)); - - testNode = "/test-node"; - zk->removeIfExists(testNode); - } - - ~Fixture() { - if (zk) { - zk->removeIfExists(testNode); - } - } -}; - -} //anonymous namespace - - -BOOST_FIXTURE_TEST_SUITE(ZKFacadeTests, Fixture) - -BOOST_AUTO_TEST_CASE(hasNode) -{ - zk->setData(testNode, "", 0); - BOOST_CHECK(zk->hasNode(testNode)); - - zk->remove(testNode); - BOOST_CHECK(!zk->hasNode(testNode)); -} - -BOOST_AUTO_TEST_CASE(getValidZKServers) -{ - BOOST_CHECK_EQUAL("localhost:22", ZKFacade::getValidZKServers("localhost:22", false)); - BOOST_CHECK_EQUAL("localhost:22", ZKFacade::getValidZKServers("localhost:22", true)); - BOOST_CHECK_EQUAL("idonotexist:22", ZKFacade::getValidZKServers("idonotexist:22", false)); - BOOST_CHECK_EQUAL("", ZKFacade::getValidZKServers("idonotexist:22", true)); - BOOST_CHECK_EQUAL("localhost:22,idonotexist:22", ZKFacade::getValidZKServers("localhost:22,idonotexist:22", false)); - BOOST_CHECK_EQUAL("localhost:22", ZKFacade::getValidZKServers("localhost:22,idonotexist:22", true)); - BOOST_CHECK_EQUAL("idonotexist:22,localhost:22", ZKFacade::getValidZKServers("idonotexist:22,localhost:22", false)); - BOOST_CHECK_EQUAL("localhost:22", ZKFacade::getValidZKServers("idonotexist:22,localhost:22", true)); -} - -BOOST_AUTO_TEST_CASE(hasNodeNotification) -{ - std::shared_ptr<Watcher> watcher(new Watcher); - - zk->hasNode(testNode, watcher); - zk->setData(testNode, "", 0); - watcher->_barrier.wait(); - - //after the notification has returned, the watcher must no longer reside in watchers map. - for (int i=0; i<20 && !watcher.unique(); ++i) { - std::this_thread::sleep_for(100ms); - } - BOOST_CHECK(watcher.unique()); -} - -BOOST_AUTO_TEST_CASE(getAndSetData) -{ - std::string inputString = "test data."; - Buffer inputBuffer(inputString.begin(), inputString.end()); - - zk->setData(testNode, inputBuffer); - - Buffer outputBuffer(zk->getData(testNode)); - std::string outputString(outputBuffer.begin(), outputBuffer.end()); - - BOOST_CHECK(outputString == inputString); - - outputString = zk->getString(testNode); - BOOST_CHECK(outputString == inputString); -} - -BOOST_AUTO_TEST_CASE(setDataMustExist) -{ - bool mustExist = true; - BOOST_REQUIRE_THROW(zk->setData(testNode, "", 0, mustExist), ZKNodeDoesNotExistsException); -} - -BOOST_AUTO_TEST_CASE(createSequenceNode) -{ - zk->setData(testNode, "", 0); - - Path prefix = testNode / "prefix"; - zk->createSequenceNode(prefix, "test", 4); - zk->createSequenceNode(prefix, "test", 4); - zk->createSequenceNode(prefix, "test", 4); - - std::vector<std::string> children = zk->getChildren(testNode); - BOOST_CHECK(children.size() == 3); - BOOST_CHECK(children.begin()->substr(0,6) == "prefix"); - - Buffer buffer(zk->getData(testNode / *children.begin())); - std::string bufferContent(buffer.begin(), buffer.end()); - - BOOST_CHECK(bufferContent == "test"); -} - -BOOST_AUTO_TEST_CASE(retainOnly) -{ - zk->setData(testNode, "", 0); - - zk->setData(testNode / "a", "", 0); - zk->setData(testNode / "b", "", 0); - zk->setData(testNode / "c", "", 0); - zk->setData(testNode / "d", "", 0); - - std::vector<std::string> toRetain; - toRetain.push_back("a"); - toRetain.push_back("c"); - - zk->retainOnly(testNode, toRetain); - std::vector<std::string> children = zk->getChildren(testNode); - - std::sort(children.begin(), children.end()); - BOOST_CHECK(children == toRetain); -} - - - -BOOST_AUTO_TEST_CASE(addEphemeralNode) -{ - Path ephemeralNode = "/test-ephemeral-node"; - zk->removeIfExists(ephemeralNode); - - //Checked deleter is ok here since we're not installing any watchers - ZKFacade::SP zk2(new ZKFacade("test1-tonyv:2181", false), boost::checked_deleter<ZKFacade>()); - zk2->addEphemeralNode(ephemeralNode); - - BOOST_CHECK(zk->hasNode(ephemeralNode)); - zk2.reset(); - BOOST_CHECK(!zk->hasNode(ephemeralNode)); -} - - - -BOOST_AUTO_TEST_CASE(dataChangedNotification) -{ - std::shared_ptr<Watcher> watcher(new Watcher); - - zk->setData(testNode, "", 0); - Buffer buffer(zk->getData(testNode, watcher)); - BOOST_CHECK(buffer.size() == 0); - - bool mustExist = true; - zk->setData(testNode, "test", 4, mustExist); - watcher->_barrier.wait(); -} - -BOOST_AUTO_TEST_CASE(getChildrenNotification) -{ - std::shared_ptr<Watcher> watcher(new Watcher); - - zk->setData(testNode, "", 0); - zk->getChildren(testNode, watcher); - - zk->setData(testNode / "child", "", 0); - watcher->_barrier.wait(); -} - -BOOST_AUTO_TEST_CASE(require_that_zkfacade_can_be_deleted_from_callback) -{ - struct DeleteZKFacadeWatcher : public Watcher { - std::shared_ptr<ZKFacade> _zk; - - DeleteZKFacadeWatcher(const std::shared_ptr<ZKFacade>& zk) - :_zk(zk) - {} - - void operator()() override { - BOOST_CHECK(_zk.use_count() == 2); - _zk.reset(); - Watcher::operator()(); - } - }; - - std::shared_ptr<Watcher> watcher((Watcher*)new DeleteZKFacadeWatcher(zk)); - - zk->setData(testNode, "", 0); - zk->getData(testNode, watcher); - - ZKFacade* unprotectedZk = zk.get(); - zk.reset(); - - unprotectedZk->setData(testNode, "t", 1); - watcher->_barrier.wait(); - - //Must wait longer than the zookeeper_close timeout to catch - //problems due to closing zookeeper in a zookeeper watcher thread. - sleep(3); -} - -BOOST_AUTO_TEST_SUITE_END() diff --git a/filedistribution/src/tests/zkfiledbmodel/.gitignore b/filedistribution/src/tests/zkfiledbmodel/.gitignore deleted file mode 100644 index 18c14c47c56..00000000000 --- a/filedistribution/src/tests/zkfiledbmodel/.gitignore +++ /dev/null @@ -1 +0,0 @@ -filedistribution_zkfiledbmodel_test_app diff --git a/filedistribution/src/tests/zkfiledbmodel/CMakeLists.txt b/filedistribution/src/tests/zkfiledbmodel/CMakeLists.txt deleted file mode 100644 index e21d0bacf6e..00000000000 --- a/filedistribution/src/tests/zkfiledbmodel/CMakeLists.txt +++ /dev/null @@ -1,16 +0,0 @@ -# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -vespa_add_executable(filedistribution_zkfiledbmodel_test_app TEST - SOURCES - test-zkfiledbmodel.cpp - DEPENDS - filedistribution_filedistributionmodel - filedistribution_common - filedistribution_mocks -) -target_compile_options(filedistribution_zkfiledbmodel_test_app PRIVATE -DTORRENT_DISABLE_ENCRYPTION -DTORRENT_DISABLE_DHT -DWITH_SHIPPED_GEOIP_H -DBOOST_ASIO_HASH_MAP_BUCKETS=1021 -DBOOST_EXCEPTION_DISABLE -DBOOST_ASIO_ENABLE_CANCELIO -DBOOST_ASIO_DYN_LINK -DTORRENT_LINKING_SHARED) -vespa_add_target_system_dependency(filedistribution_zkfiledbmodel_test_app boost boost_system${VESPA_BOOST_LIB_SUFFIX}) -vespa_add_target_system_dependency(filedistribution_zkfiledbmodel_test_app boost boost_thread${VESPA_BOOST_LIB_SUFFIX}) -vespa_add_target_system_dependency(filedistribution_zkfiledbmodel_test_app boost boost_program_options${VESPA_BOOST_LIB_SUFFIX}) -vespa_add_target_system_dependency(filedistribution_zkfiledbmodel_test_app boost boost_filesystem${VESPA_BOOST_LIB_SUFFIX}) -vespa_add_target_system_dependency(filedistribution_zkfiledbmodel_test_app boost boost_unit_test_framework${VESPA_BOOST_LIB_SUFFIX}) -vespa_add_test(NAME filedistribution_zkfiledbmodel_test_app NO_VALGRIND COMMAND filedistribution_zkfiledbmodel_test_app) diff --git a/filedistribution/src/tests/zkfiledbmodel/test-zkfiledbmodel.cpp b/filedistribution/src/tests/zkfiledbmodel/test-zkfiledbmodel.cpp deleted file mode 100644 index 20fc4364dc8..00000000000 --- a/filedistribution/src/tests/zkfiledbmodel/test-zkfiledbmodel.cpp +++ /dev/null @@ -1,88 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#define BOOST_TEST_DYN_LINK -#define BOOST_TEST_MAIN -#define BOOST_TEST_MODULE zkfiledbmodel test -#include <boost/test/unit_test.hpp> - -#include <iostream> - -#include <vespa/filedistribution/common/componentsdeleter.h> -#include <vespa/filedistribution/model/zkfacade.h> -#include <vespa/filedistribution/model/zkfiledbmodel.h> - -#include <zookeeper/zookeeper.h> - - -using namespace filedistribution; - -namespace { - -struct Fixture { - ComponentsDeleter _componentsDeleter; - std::shared_ptr<ZKFacade> zk; - std::shared_ptr<ZKFileDBModel> model; - - Fixture() { - zoo_set_debug_level(ZOO_LOG_LEVEL_WARN); - zk = _componentsDeleter.track(new ZKFacade("test1-tonyv:2181", false)); - zk->setData("/vespa", "", 0); - - model = _componentsDeleter.track(new ZKFileDBModel(zk)); - } -}; - -} //anonymous namespace - - -BOOST_FIXTURE_TEST_SUITE(ZKFileDBModelTests, Fixture) - -BOOST_AUTO_TEST_CASE(retainOnlyHostsForTenant) -{ - Path path = "/vespa/filedistribution/hosts"; - std::vector<std::string> files = {"myfile"}; - BOOST_CHECK(zk->hasNode("/vespa")); - BOOST_CHECK(zk->hasNode("/vespa/filedistribution")); - BOOST_CHECK(zk->hasNode(path)); - model->setDeployedFilesToDownload("testhost", "myapp:so:cool", files); - model->setDeployedFilesToDownload("testhost2", "myapp:so:cool", files); - model->setDeployedFilesToDownload("testhost3", "myapp:so:cool", files); - model->setDeployedFilesToDownload("testhost3", "myapp:legacyid:so:cool", files); - model->setDeployedFilesToDownload("testhost3", "yourapp:so:cool", files); - BOOST_CHECK(zk->getChildren(path / "testhost").size() == 1); - BOOST_CHECK(zk->getChildren(path / "testhost2").size() == 1); - BOOST_CHECK(zk->getChildren(path / "testhost3").size() == 3); - - model->cleanDeployedFilesToDownload({"testhost3"}, "yourapp:so:cool"); - model->removeDeploymentsThatHaveDifferentApplicationId({"testhost3"}, "yourapp:so:cool"); - BOOST_CHECK(zk->hasNode(path / "testhost")); - BOOST_CHECK(zk->hasNode(path / "testhost2")); - BOOST_CHECK(zk->hasNode(path / "testhost3")); - BOOST_CHECK(zk->getChildren(path / "testhost").size() == 1); - BOOST_CHECK(zk->getChildren(path / "testhost2").size() == 1); - BOOST_CHECK(zk->getChildren(path / "testhost3").size() == 1); - - model->cleanDeployedFilesToDownload({"testhost"}, "myapp:not:cool"); - model->removeDeploymentsThatHaveDifferentApplicationId({"testhost"}, "myapp:not:cool"); - BOOST_CHECK(zk->hasNode(path / "testhost")); - BOOST_CHECK(zk->hasNode(path / "testhost2")); - BOOST_CHECK(zk->hasNode(path / "testhost3")); - BOOST_CHECK(zk->getChildren(path / "testhost").size() == 0); - BOOST_CHECK(zk->getChildren(path / "testhost2").size() == 1); - BOOST_CHECK(zk->getChildren(path / "testhost3").size() == 1); - - model->cleanDeployedFilesToDownload({"testhost2"}, "myapp:so:cool"); - model->removeDeploymentsThatHaveDifferentApplicationId({"testhost2"}, "myapp:so:cool"); - - BOOST_CHECK(!zk->hasNode(path / "testhost")); - BOOST_CHECK(zk->hasNode(path / "testhost2")); - BOOST_CHECK(zk->hasNode(path / "testhost3")); - BOOST_CHECK(zk->getChildren(path / "testhost2").size() == 1); - BOOST_CHECK(zk->getChildren(path / "testhost3").size() == 1); - - model->cleanDeployedFilesToDownload({"testhost2"}, "yourapp:so:cool"); - BOOST_CHECK(!zk->hasNode(path / "testhost")); - BOOST_CHECK(zk->hasNode(path / "testhost2")); - BOOST_CHECK(!zk->hasNode(path / "testhost3")); -} - -BOOST_AUTO_TEST_SUITE_END() diff --git a/filedistribution/src/vespa/.gitignore b/filedistribution/src/vespa/.gitignore deleted file mode 100644 index a477b4a6e5f..00000000000 --- a/filedistribution/src/vespa/.gitignore +++ /dev/null @@ -1,3 +0,0 @@ -Makefile -config-*.h -config-*.cpp diff --git a/filedistribution/src/vespa/filedistribution/common/CMakeLists.txt b/filedistribution/src/vespa/filedistribution/common/CMakeLists.txt deleted file mode 100644 index 129b084b400..00000000000 --- a/filedistribution/src/vespa/filedistribution/common/CMakeLists.txt +++ /dev/null @@ -1,15 +0,0 @@ -# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -vespa_add_library(filedistribution_exceptionrethrower OBJECT - SOURCES - exception.cpp - DEPENDS -) -vespa_add_library(filedistribution_common STATIC - SOURCES - componentsdeleter.cpp - exception.cpp - vespa_logfwd.cpp - DEPENDS -) -target_compile_options(filedistribution_common PRIVATE -DTORRENT_DISABLE_ENCRYPTION -DTORRENT_DISABLE_DHT -DWITH_SHIPPED_GEOIP_H -DBOOST_ASIO_HASH_MAP_BUCKETS=1021 -DBOOST_EXCEPTION_DISABLE -DBOOST_ASIO_ENABLE_CANCELIO -DBOOST_ASIO_DYN_LINK -DTORRENT_LINKING_SHARED) -vespa_add_target_system_dependency(filedistribution_common boost boost_thread${VESPA_BOOST_LIB_SUFFIX}) diff --git a/filedistribution/src/vespa/filedistribution/common/buffer.h b/filedistribution/src/vespa/filedistribution/common/buffer.h deleted file mode 100644 index 258f43be90c..00000000000 --- a/filedistribution/src/vespa/filedistribution/common/buffer.h +++ /dev/null @@ -1,93 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#pragma once - -#include <algorithm> - -namespace filedistribution { - -class Buffer { - size_t _capacity; - char* _buf; - size_t _size; -public: - typedef char value_type; - typedef const value_type& const_reference; - typedef char* iterator; - typedef const char* const_iterator; - - Buffer(const Buffer &) = delete; - Buffer & operator = (const Buffer &) = delete; - explicit Buffer(size_t capacityArg) - :_capacity(capacityArg), - _buf( new char[_capacity] ), - _size(0) - {} - - Buffer(Buffer && rhs) : - _capacity(rhs._capacity), - _buf(rhs._buf), - _size(rhs._size) - { - rhs._capacity = 0; - rhs._size = 0; - rhs._buf = nullptr; - } - - template <typename ITER> - Buffer(ITER beginIter, ITER endIter) - : _capacity(endIter-beginIter), - _buf( new char[_capacity] ), - _size(_capacity) - { - std::copy(beginIter, endIter, begin()); - } - - ~Buffer() { - delete[] _buf; - } - - size_t capacity() const { return _capacity; } - size_t size() const { return _size; } - - //might expose uninitialized memory - void resize(size_t newSize) { - if ( newSize <= _capacity ) - _size = newSize; - else { - reserve(newSize); - _size = newSize; - } - } - - void reserve(size_t newCapacity) { - if ( newCapacity > _capacity ) { - Buffer buffer(newCapacity); - buffer._size = _size; - std::copy(begin(), end(), buffer.begin()); - buffer.swap(*this); - } - } - - void swap(Buffer& other) { - std::swap(_capacity, other._capacity); - std::swap(_buf, other._buf); - std::swap(_size, other._size); - } - - void push_back(char c) { - if (_size == _capacity) - reserve(_capacity * 2); - _buf[_size++] = c; - } - - iterator begin() { return _buf; } - iterator end() { return _buf + _size; } - const_iterator begin() const { return _buf; } - const_iterator end() const { return _buf + _size; } - char operator[](size_t i) const { return _buf[i]; } - char& operator[](size_t i) { return _buf[i]; } -}; - -} //namespace filedistribution - - diff --git a/filedistribution/src/vespa/filedistribution/common/componentsdeleter.cpp b/filedistribution/src/vespa/filedistribution/common/componentsdeleter.cpp deleted file mode 100644 index 1dc0975a925..00000000000 --- a/filedistribution/src/vespa/filedistribution/common/componentsdeleter.cpp +++ /dev/null @@ -1,81 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include "componentsdeleter.h" -#include <cassert> - -#include <vespa/log/log.h> -LOG_SETUP(".componentsdeleter"); - -using namespace std::literals; -using namespace filedistribution; - -struct ComponentsDeleter::Worker { - ComponentsDeleter& _parent; - - Worker(ComponentsDeleter* parent) - :_parent(*parent) {} - - void operator()(); -}; - - -void -ComponentsDeleter::Worker::operator()() -{ - while ( ! _parent.areWeDone() ) { - CallDeleteFun deleteFun = _parent._deleteRequests.pop(); - deleteFun(); - } -} - -ComponentsDeleter::ComponentsDeleter() : - _closed(false), - _deleterThread(Worker(this)) -{} - -ComponentsDeleter::~ComponentsDeleter() -{ - close(); - waitForAllComponentsDeleted(); - _deleterThread.join(); -} - -void -ComponentsDeleter::waitForAllComponentsDeleted() -{ - LOG(debug, "Waiting for all components to be deleted"); - - for (int i=0; i<600 && !areWeDone(); ++i) { - std::this_thread::sleep_for(100ms); - } - LOG(debug, "Done waiting for all components to be deleted"); - assert(_trackedComponents.empty()); - assert(_deleteRequests.empty()); -} - -void -ComponentsDeleter::close() -{ - { - LockGuard guard(_trackedComponentsMutex); - _closed = true; - } - _deleteRequests.push([]() { LOG(debug, "I am the last one, hurry up and shutdown"); }); -} - -bool -ComponentsDeleter::areWeDone() -{ - LockGuard guard(_trackedComponentsMutex); - return _closed && _trackedComponents.empty() && _deleteRequests.empty(); -} - -void -ComponentsDeleter::removeFromTrackedComponents(void* component) { - LockGuard guard(_trackedComponentsMutex); - if (_trackedComponents.count(component)) - LOG(debug, "Deleting '%s'", _trackedComponents[component].c_str()); - - size_t numErased = _trackedComponents.erase(component); - assert(numErased == 1); - (void) numErased; -} diff --git a/filedistribution/src/vespa/filedistribution/common/componentsdeleter.h b/filedistribution/src/vespa/filedistribution/common/componentsdeleter.h deleted file mode 100644 index 65860d3ad66..00000000000 --- a/filedistribution/src/vespa/filedistribution/common/componentsdeleter.h +++ /dev/null @@ -1,73 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#pragma once - -#include "concurrentqueue.h" - -#include <map> -#include <typeinfo> -#include <string> -#include <mutex> -#include <thread> -#include <functional> - - -namespace filedistribution { -/** - * Ensures that components are deleted in a separate thread, - * and that their lifetime is tracked. - * - * This prevents situations as e.g. deleting ZKFacade from a zookeeper watcher thread. - */ -class ComponentsDeleter { - class Worker; - typedef std::lock_guard<std::mutex> LockGuard; - - std::mutex _trackedComponentsMutex; - typedef std::map<void*, std::string> TrackedComponentsMap; - TrackedComponentsMap _trackedComponents; - - typedef std::function<void (void)> CallDeleteFun; - ConcurrentQueue<CallDeleteFun> _deleteRequests; - bool _closed; - std::thread _deleterThread; - - void removeFromTrackedComponents(void* component); - - template <class T> - void deleteComponent(T* component) { - removeFromTrackedComponents(component); - delete component; - } - - template <class T> - void requestDelete(T* component) { - _deleteRequests.push([this, component]() { deleteComponent<T>(component); }); - } - - void waitForAllComponentsDeleted(); - bool areWeDone(); - void close(); - public: - ComponentsDeleter(const ComponentsDeleter &) = delete; - ComponentsDeleter & operator = (const ComponentsDeleter &) = delete; - ComponentsDeleter(); - - /* - * Waits blocking for up to 60 seconds until all components are deleted. - * If it fails, the application is killed. - */ - ~ComponentsDeleter(); - - template <class T> - std::shared_ptr<T> track(T* t) { - LockGuard guard(_trackedComponentsMutex); - if (_closed) { - return std::shared_ptr<T>(t); - } - - _trackedComponents[t] = typeid(t).name(); - return std::shared_ptr<T>(t, [this](T * p) { requestDelete<T>(p); }); - } -}; -} - diff --git a/filedistribution/src/vespa/filedistribution/common/concurrentqueue.h b/filedistribution/src/vespa/filedistribution/common/concurrentqueue.h deleted file mode 100644 index db23b56c500..00000000000 --- a/filedistribution/src/vespa/filedistribution/common/concurrentqueue.h +++ /dev/null @@ -1,56 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#pragma once - -#include <queue> - -#include <mutex> -#include <condition_variable> - -namespace filedistribution { - -template <typename T> -class ConcurrentQueue { -public: - typedef T value_type; -private: - std::condition_variable _nonEmpty; - - mutable std::mutex _queueMutex; - typedef std::unique_lock<std::mutex> UniqueLock; - - std::queue<value_type> _queue; - -public: - void push(const T& t) { - { - UniqueLock guard(_queueMutex); - _queue.push(t); - } - _nonEmpty.notify_one(); - } - - //Assumes that value_type has nonthrow copy constructors - const T pop() { - UniqueLock guard(_queueMutex); - while (_queue.empty()) { - _nonEmpty.wait(guard); - } - T result = _queue.front(); - _queue.pop(); - return result; - } - - void clear() { - UniqueLock guard(_queueMutex); - while (!_queue.empty()) { - _queue.pop(); - } - } - bool empty() { - UniqueLock guard(_queueMutex); - return _queue.empty(); - } -}; - -} //namespace filedistribution - diff --git a/filedistribution/src/vespa/filedistribution/common/exception.cpp b/filedistribution/src/vespa/filedistribution/common/exception.cpp deleted file mode 100644 index 499cbcea9b9..00000000000 --- a/filedistribution/src/vespa/filedistribution/common/exception.cpp +++ /dev/null @@ -1,8 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include "exception.h" - -namespace filedistribution { - -VESPA_IMPLEMENT_EXCEPTION(FileDoesNotExistException, vespalib::Exception); - -} diff --git a/filedistribution/src/vespa/filedistribution/common/exception.h b/filedistribution/src/vespa/filedistribution/common/exception.h deleted file mode 100644 index bb47aa3c779..00000000000 --- a/filedistribution/src/vespa/filedistribution/common/exception.h +++ /dev/null @@ -1,13 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#pragma once - -#include <vespa/vespalib/util/exceptions.h> -#include <boost/filesystem/path.hpp> - -namespace filedistribution { - -using Path = boost::filesystem::path; - -VESPA_DEFINE_EXCEPTION(FileDoesNotExistException, vespalib::Exception); - -} diff --git a/filedistribution/src/vespa/filedistribution/common/logfwd.h b/filedistribution/src/vespa/filedistribution/common/logfwd.h deleted file mode 100644 index 2732b9713fa..00000000000 --- a/filedistribution/src/vespa/filedistribution/common/logfwd.h +++ /dev/null @@ -1,22 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#pragma once - -#include <cstdarg> - -namespace filedistribution { - -/** To avoid requiring vespa log from the jni library*/ -namespace logfwd { - -enum LogLevel { debug, error, warning, info }; - -void log_forward(LogLevel level, const char *file, int line, const char *fmt, ...) - __attribute__((format(printf,4,5))); - -} //namespace logfwd -} //namespace filedistribution - -#define LOGFWD(level, ...) filedistribution::logfwd::log_forward(filedistribution::logfwd::level, \ - __FILE__, __LINE__, __VA_ARGS__) - - diff --git a/filedistribution/src/vespa/filedistribution/common/vespa_logfwd.cpp b/filedistribution/src/vespa/filedistribution/common/vespa_logfwd.cpp deleted file mode 100644 index 0da06ded32d..00000000000 --- a/filedistribution/src/vespa/filedistribution/common/vespa_logfwd.cpp +++ /dev/null @@ -1,44 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include "logfwd.h" -#include <vector> -#include <vespa/log/log.h> - -LOG_SETUP(".common.model"); - - -using ns_log::Logger; - -namespace { - Logger::LogLevel toVespaLogLevel(filedistribution::logfwd::LogLevel level) { - namespace l = filedistribution::logfwd; - - switch (level) { - case l::info: return Logger::info; - case l::debug: return Logger::debug; - case l::error: return Logger::error; - case l::warning: return Logger::warning; - default: - LOG(error, "Unknown log level, falling back to error"); - return Logger::error; - } - } -} - -void filedistribution::logfwd::log_forward(LogLevel level, const char* file, int line, const char* fmt, ...) -{ - - Logger::LogLevel vespaLogLevel = toVespaLogLevel(level); - - if (logger.wants(vespaLogLevel)) { - const size_t maxSize(0x8000); - std::vector<char> payload(maxSize); - char * buf = &payload[0]; - - va_list args; - va_start(args, fmt); - vsnprintf(buf, maxSize, fmt, args); - va_end(args); - - logger.doLog(vespaLogLevel, file, line, "%s", buf); - } -} diff --git a/filedistribution/src/vespa/filedistribution/distributor/CMakeLists.txt b/filedistribution/src/vespa/filedistribution/distributor/CMakeLists.txt deleted file mode 100644 index 6b575dc5526..00000000000 --- a/filedistribution/src/vespa/filedistribution/distributor/CMakeLists.txt +++ /dev/null @@ -1,13 +0,0 @@ -# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -vespa_add_library(filedistribution_distributor STATIC - SOURCES - filedistributortrackerimpl.cpp - filedownloader.cpp - filedownloadermanager.cpp - hostname.cpp - scheduler.cpp - signalhandling.cpp - state_server_impl.cpp - DEPENDS -) -target_compile_options(filedistribution_distributor PRIVATE -DTORRENT_DISABLE_ENCRYPTION -DTORRENT_DISABLE_DHT -DWITH_SHIPPED_GEOIP_H -DBOOST_ASIO_HASH_MAP_BUCKETS=1021 -DBOOST_EXCEPTION_DISABLE -DBOOST_ASIO_ENABLE_CANCELIO -DBOOST_ASIO_DYN_LINK -DTORRENT_LINKING_SHARED) diff --git a/filedistribution/src/vespa/filedistribution/distributor/filedistributortrackerimpl.cpp b/filedistribution/src/vespa/filedistribution/distributor/filedistributortrackerimpl.cpp deleted file mode 100644 index 185a620b7f3..00000000000 --- a/filedistribution/src/vespa/filedistribution/distributor/filedistributortrackerimpl.cpp +++ /dev/null @@ -1,202 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "filedistributortrackerimpl.h" -#include "filedownloader.h" -#include <vespa/filedistribution/model/zkfacade.h> - -#include <vespa/log/log.h> -LOG_SETUP(".filedistributiontrackerimpl"); - -using namespace filedistribution; - -typedef FileDistributionModel::PeerEntries PeerEntries; - -namespace asio = boost::asio; - -namespace { - -void -filterSelf(FileDistributionModel::PeerEntries& peers, - const std::string& hostName, - int port) -{ - FileDistributionModel::PeerEntries::iterator - i = peers.begin(), - currEnd = peers.end(); - - while ( i != currEnd ) { - //hostName is currently used in the ip field - if (i->ip == hostName && i->port == port) { - --currEnd; - std::swap(*i, *currEnd); - } else { - ++i; - } - } - - peers.erase(currEnd, peers.end()); -} - -void resolveIPAddresses(PeerEntries& peers) { - for (auto& p: peers) { - try { - p.ip = filedistribution::lookupIPAddress(p.ip); - } catch (filedistribution::FailedResolvingHostName& e) { - LOG(info, "Failed resolving address %s", p.ip.c_str()); - } - } -} - -struct TrackingTask : public Scheduler::Task { - int _numTimesRescheduled; - - libtorrent::tracker_request _trackerRequest; - boost::weak_ptr<libtorrent::torrent> _torrent; - std::weak_ptr<FileDownloader> _downloader; - std::shared_ptr<FileDistributionModel> _model; - - TrackingTask(Scheduler& scheduler, - const libtorrent::tracker_request& trackerRequest, - const TorrentSP & torrent, - const std::weak_ptr<FileDownloader>& downloader, - const std::shared_ptr<FileDistributionModel>& model); - ~TrackingTask(); - - //TODO: refactor - void doHandle() override; - PeerEntries getPeers(const std::shared_ptr<FileDownloader>& downloader); - void reschedule(); -}; - -TrackingTask::TrackingTask(Scheduler& scheduler, - const libtorrent::tracker_request& trackerRequest, - const TorrentSP & torrent, - const std::weak_ptr<FileDownloader>& downloader, - const std::shared_ptr<FileDistributionModel>& model) - : Task(scheduler), - _numTimesRescheduled(0), - _trackerRequest(trackerRequest), - _torrent(torrent), - _downloader(downloader), - _model(model) -{ } - -TrackingTask::~TrackingTask() {} - -#pragma GCC diagnostic push -#pragma GCC diagnostic ignored "-Winline" -//TODO: refactor -void -TrackingTask::doHandle() { - if (std::shared_ptr<FileDownloader> downloader = _downloader.lock()) { - //All torrents must be destructed before the session is destructed. - //It's okay to prevent the torrent from expiring here - //since the session can't be destructed while - //we hold a shared_ptr to the downloader. - if (TorrentSP torrent = _torrent.lock()) { - PeerEntries peers = getPeers(downloader); - - if (!peers.empty()) { - torrent->session().m_io_service.dispatch( - [torrent_weak_ptr = _torrent, trackerRequest = _trackerRequest, peers = peers]() mutable { - if (auto torrent_sp = torrent_weak_ptr.lock()) { - torrent_sp->tracker_response( - trackerRequest, - libtorrent::address(), - std::list<libtorrent::address>(), - peers, - -1, -1, -1, -1, -1, - libtorrent::address(), "trackerid"); - } - }); - } - - if (peers.size() < 5) { - reschedule(); - } - } - } -} -#pragma GCC diagnostic pop - -PeerEntries -TrackingTask::getPeers(const std::shared_ptr<FileDownloader>& downloader) { - std::string fileReference = downloader->infoHash2FileReference(_trackerRequest.info_hash); - - const size_t recommendedMaxNumberOfPeers = 30; - PeerEntries peers = _model->getPeers(fileReference, recommendedMaxNumberOfPeers); - - //currently, libtorrent stops working if it tries to connect to itself. - filterSelf(peers, downloader->_hostName, downloader->_port); - resolveIPAddresses(peers); - for (const auto& peer: peers) { - LOG(debug, "Returning peer with ip %s", peer.ip.c_str()); - } - - return peers; -} - -void -TrackingTask::reschedule() { - if (_numTimesRescheduled < 5) { - double fudgeFactor = 0.1; - schedule(boost::posix_time::seconds(static_cast<int>( - std::pow(3., _numTimesRescheduled) + fudgeFactor))); - _numTimesRescheduled++; - } -} - -} //anonymous namespace - -FileDistributorTrackerImpl::FileDistributorTrackerImpl(const std::shared_ptr<FileDistributionModel>& model) : - _model(model) -{} - -FileDistributorTrackerImpl::~FileDistributorTrackerImpl() { - LOG(debug, "Deconstructing FileDistributorTrackerImpl"); - - LockGuard guard(_mutex); - _scheduler.reset(); -} - -void -FileDistributorTrackerImpl::trackingRequest( - libtorrent::tracker_request& request, - const TorrentSP & torrent) -{ - LockGuard guard(_mutex); - - if (torrent != TorrentSP()) { - std::shared_ptr<TrackingTask> trackingTask(new TrackingTask( - *_scheduler.get(), request, torrent, _downloader, _model)); - - trackingTask->scheduleNow(); - } -} - -void asioWorker(asio::io_service& ioService) -{ - while (!ioService.stopped()) { - try { - ioService.run(); - } catch (const ZKConnectionLossException & e) { - LOG(info, "Connection loss in asioWorker thread, resuming. %s", e.what()); - } catch (const ZKOperationTimeoutException & e) { - LOG(warning, "Operation timed out in asioWorker thread, will do quick exit to start a clean sheet. %s", e.what()); - std::quick_exit(31); - } - } -} - -void -FileDistributorTrackerImpl::setDownloader(const std::shared_ptr<FileDownloader>& downloader) -{ - LockGuard guard(_mutex); - - _scheduler.reset(); - _downloader = downloader; - - if (downloader) { - _scheduler.reset(new Scheduler([] (asio::io_service& ioService) { asioWorker(ioService); })); - } -} diff --git a/filedistribution/src/vespa/filedistribution/distributor/filedistributortrackerimpl.h b/filedistribution/src/vespa/filedistribution/distributor/filedistributortrackerimpl.h deleted file mode 100644 index 5f701228631..00000000000 --- a/filedistribution/src/vespa/filedistribution/distributor/filedistributortrackerimpl.h +++ /dev/null @@ -1,39 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#pragma once - -#include <libtorrent/session.hpp> -#include <libtorrent/torrent.hpp> - -#include <vespa/filedistribution/model/filedistributionmodel.h> -#include "scheduler.h" -#include <mutex> - -namespace filedistribution { -class FileDistributionModel; -class FileDownloader; - -using TorrentSP = boost::shared_ptr<libtorrent::torrent>; - -class FileDistributorTrackerImpl : public FileDistributionTracker { - const std::shared_ptr<FileDistributionModel> _model; - - typedef std::lock_guard<std::mutex> LockGuard; - std::mutex _mutex; - std::weak_ptr<FileDownloader> _downloader; - - //Use separate worker thread to avoid potential deadlock - //between tracker requests and files to download changed requests. - std::unique_ptr<Scheduler> _scheduler; -public: - FileDistributorTrackerImpl(const std::shared_ptr<FileDistributionModel>& model); - - virtual ~FileDistributorTrackerImpl(); - - //overrides - void trackingRequest(libtorrent::tracker_request& request, const TorrentSP & torrent) override; - - void setDownloader(const std::shared_ptr<FileDownloader>& downloader); -}; - -} //namespace filedistribution - diff --git a/filedistribution/src/vespa/filedistribution/distributor/filedownloader.cpp b/filedistribution/src/vespa/filedistribution/distributor/filedownloader.cpp deleted file mode 100644 index 351cb5d5717..00000000000 --- a/filedistribution/src/vespa/filedistribution/distributor/filedownloader.cpp +++ /dev/null @@ -1,460 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include "filedownloader.h" -#include "hostname.h" -#include <vespa/filedistribution/model/zkfacade.h> -#include <vespa/vespalib/util/stringfmt.h> - -#include <boost/filesystem.hpp> -#include <boost/filesystem/fstream.hpp> -#include <boost/filesystem/convenience.hpp> -#include <boost/function_output_iterator.hpp> - -#include <libtorrent/alert.hpp> -#include <libtorrent/alert_types.hpp> -#include <libtorrent/torrent_handle.hpp> -#include <libtorrent/bencode.hpp> - -#include <iterator> -#include <algorithm> - -#include <vespa/log/log.h> -LOG_SETUP(".filedownloader"); - -using namespace filedistribution; -namespace fs = boost::filesystem; - -using libtorrent::sha1_hash; -using libtorrent::torrent_handle; - -namespace { -const std::string resumeDataSuffix = ".resume"; -const std::string resumeDataSuffixTemp = ".resumetemp"; -const std::string newFileSuffix = ".new"; - -std::string -fileReferenceToString(const libtorrent::sha1_hash& fileReference) { - std::ostringstream fileReferenceString; - fileReferenceString <<fileReference; - - assert (fileReferenceString.str().size() == 40); - return fileReferenceString.str(); -} - -libtorrent::sha1_hash -toInfoHash(const std::string& fileReference) { - assert (fileReference.size() == 40); - std::istringstream s(fileReference); - - sha1_hash infoHash; - s >> infoHash; - return infoHash; -} - -void -addNewFile(const fs::path& dbPath, const fs::path& newFile) { - LOG(debug, "Adding new file: '%s'.", newFile.string().c_str()); - const fs::path destination = dbPath / newFile.stem(); - - if ( fs::exists(destination) ) { - fs::remove_all(destination); - } - - fs::path resumeData = destination.string() + resumeDataSuffix; - if ( fs::exists(resumeData) ) { - fs::remove(resumeData); - } - - fs::rename(newFile, destination); -} - -void -addNewDbFiles(const fs::path& dbPath) { - for (fs::directory_iterator i(dbPath), end; i != end; ++i) { - if (newFileSuffix == fs::extension(*i)) { - addNewFile(dbPath, *i); - } - } -} - -fs::path -resumeDataPath(const libtorrent::torrent_handle& torrent) { -#pragma GCC diagnostic ignored "-Wdeprecated-declarations" - fs::path tmp(torrent.save_path()); -#pragma GCC diagnostic pop - return tmp.string() + resumeDataSuffix; -} - -fs::path -resumeDataPathTemp(const libtorrent::torrent_handle& torrent) { -#pragma GCC diagnostic ignored "-Wdeprecated-declarations" - fs::path tmp(torrent.save_path()); -#pragma GCC diagnostic pop - return tmp.string() + resumeDataSuffixTemp; -} - -fs::path -getMainFile(const libtorrent::torrent_handle& handle) { - const libtorrent::torrent_info fallback(handle.info_hash()); - auto p = handle.torrent_file(); - const libtorrent::torrent_info& info = (p ? *p : fallback); - return info.files().num_files() == 1 ? - info.file_at(0).path : - info.name(); -} - -std::string -getMainName(const libtorrent::torrent_handle& handle) { - const libtorrent::torrent_info fallback(handle.info_hash()); - auto p = handle.torrent_file(); - const libtorrent::torrent_info& info = (p ? *p : fallback); - return info.name(); -} - -libtorrent::session_settings -createSessionSettings() { - libtorrent::session_settings s; - - const int unlimited = -1; - s.active_downloads = s.active_seeds = s.active_limit = unlimited; - - s.min_reconnect_time = 1; //seconds - s.min_announce_interval = 5 * 60; //seconds - return s; -} - -} //anonymous namespace - -namespace filedistribution { - VESPA_IMPLEMENT_EXCEPTION(NoSuchTorrentException, vespalib::Exception); -} - -struct FileDownloader::EventHandler -{ - FileDownloader& _fileDownloader; - - EventHandler(FileDownloader* fileDownloader) - : _fileDownloader(*fileDownloader) - {} - - void defaultHandler(const libtorrent::alert& alert) const { - LOG(debug, "alert %s: %s", alert.what(), alert.message().c_str()); - } - - void operator()(const libtorrent::listen_failed_alert& alert) const { - throw vespalib::PortListenException(alert.endpoint.port(), alert.endpoint.address().to_string(), alert.message(), VESPA_STRLOC); - } - void operator()(const libtorrent::fastresume_rejected_alert& alert) const { - LOG(debug, "alert %s: %s", alert.what(), alert.message().c_str()); - } - void operator()(const libtorrent::torrent_delete_failed_alert& alert) const { - LOG(warning, "alert %s: %s", alert.what(), alert.message().c_str()); - } - void operator()(const libtorrent::file_error_alert& alert) const { - LOG(error, "alert %s: %s", alert.what(), alert.message().c_str()); - } - - void operator()(const libtorrent::torrent_finished_alert& alert) const { - defaultHandler(alert); - - std::string fileReference = fileReferenceToString(alert.handle.info_hash()); - - LOG(debug, "File '%s' with file reference '%s' downloaded successfully.", - getMainName(alert.handle).c_str(), - fileReference.c_str()); - - _fileDownloader.signalIfFinishedDownloading(fileReference); - alert.handle.save_resume_data(); - _fileDownloader.didRequestSRD(); - } - - void operator()(const libtorrent::save_resume_data_failed_alert& alert) const { - LOG(warning, "save resume data failed: %s -- %s", - alert.what(), alert.message().c_str()); - _fileDownloader.didReceiveSRD(); - } - - void operator()(const libtorrent::save_resume_data_alert& alert) const { - defaultHandler(alert); - - fs::ofstream resumeFile(resumeDataPathTemp(alert.handle), std::ios_base::binary); - resumeFile.unsetf(std::ios_base::skipws); - libtorrent::bencode(std::ostream_iterator<char>(resumeFile), *alert.resume_data); - resumeFile.close(); - fs::rename(resumeDataPathTemp(alert.handle), resumeDataPath(alert.handle)); - _fileDownloader.didReceiveSRD(); - } - - void handle(std::unique_ptr<libtorrent::alert> alert) { - try { - libtorrent::handle_alert< - libtorrent::torrent_finished_alert, - libtorrent::save_resume_data_alert, - libtorrent::save_resume_data_failed_alert, - libtorrent::listen_failed_alert, - libtorrent::file_error_alert, - libtorrent::fastresume_rejected_alert, - libtorrent::torrent_delete_failed_alert> - dispatch(alert, *this); - } catch (libtorrent::unhandled_alert& e) { - LOG(debug, "alert (ignored): %s -- %s", - alert->what(), alert->message().c_str()); - } - } -}; - -FileDownloader::LogSessionDeconstructed::~LogSessionDeconstructed() -{ - LOG(debug, "Libtorrent session closed successfully."); -} - -FileDownloader::FileDownloader(const std::shared_ptr<FileDistributionTracker>& tracker, - const std::string& hostName, int port, - const fs::path& dbPath) - : _outstanding_SRD_requests(0), - _tracker(tracker), - _session(tracker.get(), libtorrent::fingerprint("vp", 0, 0, 0, 0), 0), - _closed(false), - _dbPath(dbPath), - _hostName(hostName), - _port(port) -{ - if (!fs::exists(_dbPath)) { - fs::create_directories(_dbPath); - } - addNewDbFiles(_dbPath); - - _session.set_settings(createSessionSettings()); - listen(); - _session.set_alert_mask( - libtorrent::alert::error_notification | - libtorrent::alert::status_notification); - -} - -void -FileDownloader::drain() { - EventHandler eventHandler(this); - size_t cnt = 0; - size_t waitCount = 0; - do { - LOG(debug, "destructor waiting for %zu SRD alerts", _outstanding_SRD_requests.load()); - while (_session.wait_for_alert(libtorrent::milliseconds(20))) { - std::unique_ptr<libtorrent::alert> alert = _session.pop_alert(); - eventHandler.handle(std::move(alert)); - ++cnt; - } - waitCount++; - } while (!drained() && (waitCount < 1000)); - LOG(debug, "handled %zu alerts during draining.", cnt); - if (!drained()) { - LOG(error, "handled %zu alerts during draining. But there are still %zu left.", cnt, _outstanding_SRD_requests.load()); - LOG(error, "We have been waiting for stuff that did not happen."); - } -} - -FileDownloader::~FileDownloader() { - assert(drained()); -} - -void -FileDownloader::listen() { - boost::system::error_code ec; - _session.listen_on(std::make_pair(_port, _port), // (min, max) - ec, 0, // network interface (default value) - libtorrent::session::listen_no_system_port); - if (!ec && (_session.listen_port() == _port)) { - return; - } - throw vespalib::PortListenException(_port, _hostName, VESPA_STRLOC); -} - -boost::optional< fs::path > -FileDownloader::pathToCompletedFile(const std::string& fileReference) const { - libtorrent::torrent_handle torrent = _session.find_torrent(toInfoHash(fileReference)); - -#pragma GCC diagnostic ignored "-Wdeprecated-declarations" - if (torrent.is_valid() && torrent.is_finished()) { -#pragma GCC diagnostic pop - return _dbPath / fileReference / getMainFile(torrent); - } else { - return boost::optional< fs::path>(); - } -} - - -boost::optional<FileDownloader::ResumeDataBuffer> -FileDownloader::getResumeData(const std::string& fileReference) { - LOG(debug, "Reading resume data for '%s'", fileReference.c_str()); - try { - fs::path path = (_dbPath / fileReference).string() + resumeDataSuffix; - if (fs::exists(path)) { - fs::ifstream file(path, std::ios::binary); - ResumeDataBuffer result; - - std::istream_iterator<char> iterator(file), end; - std::copy(iterator, end, std::back_inserter(result)); - LOG(debug, "Successfully retrieved resume data for '%s'", fileReference.c_str()); - if (result.size() < 50) { - LOG(info, "Very small resume file %zu bytes.", result.size()); - } - - return result; - } - } catch(...) { - //resume data is only an optimization - LOG(info, "Error while reading resume data for '%s'", fileReference.c_str()); - } - return boost::optional<ResumeDataBuffer>(); -} - - -bool -FileDownloader::hasTorrent(const std::string& fileReference) const { - return _session.find_torrent(toInfoHash(fileReference)).is_valid(); -} - -void -FileDownloader::addTorrent(const std::string& fileReference, const Buffer& buffer) { - if (closed()) { return; } - LockGuard guard(_modifyTorrentsDownloadingMutex); - - boost::optional<ResumeDataBuffer> resumeData = getResumeData(fileReference); - - if (_session.find_torrent( (toInfoHash(fileReference))).is_valid()) - return; - - libtorrent::lazy_entry entry; -#pragma GCC diagnostic ignored "-Wdeprecated-declarations" - libtorrent::lazy_bdecode(&*buffer.begin(), &*buffer.end(), entry); //out -#pragma GCC diagnostic pop - - libtorrent::add_torrent_params torrentParams; - torrentParams.save_path = (_dbPath / fileReference).string(); - torrentParams.ti = new libtorrent::torrent_info(entry); - - torrentParams.auto_managed = false; - torrentParams.paused = false; - - if (resumeData) { - torrentParams.resume_data = *resumeData; //vector will be swapped - } - - libtorrent::torrent_handle torrentHandle = _session.add_torrent(torrentParams); - - LOG(debug, "Started downloading file '%s' with file reference '%s'.", - getMainName(torrentHandle).c_str(), fileReference.c_str()); -} - - -void -FileDownloader::deleteTorrentData(const libtorrent::torrent_handle& torrent, LockGuard&) { - if (torrent.is_valid()) { - fs::path resumeFilePath = resumeDataPath(torrent); - -#pragma GCC diagnostic ignored "-Wdeprecated-declarations" - fs::path savePath = torrent.save_path(); -#pragma GCC diagnostic pop - - //the files might not exist, so ignore return value - fs::remove_all(savePath); - fs::remove(resumeFilePath); - } - - _downloadFailed(fileReferenceToString(torrent.info_hash()), FileProvider::FileReferenceRemoved); -} - -void -FileDownloader::removeAllTorrentsBut(const std::set<std::string> & filesToRetain) { - if (closed()) { return; } - LockGuard guard(_modifyTorrentsDownloadingMutex); - - std::set<std::string> currentFiles; - std::set<sha1_hash> infoHashesToRetain; - for (const std::string& fileReference : filesToRetain) { - infoHashesToRetain.insert(toInfoHash(fileReference)); - } - - std::vector<torrent_handle> torrents = _session.get_torrents(); - - for (torrent_handle torrent : torrents) { - if (!infoHashesToRetain.count(torrent.info_hash())) { - LOG(info, "Removing torrent: '%s' with file reference '%s'", - getMainName(torrent).c_str(), fileReferenceToString(torrent.info_hash()).c_str()); - - deleteTorrentData(torrent, guard); - _session.remove_torrent(torrent); - } - } -} - - -void FileDownloader::runEventLoop() { - EventHandler eventHandler(this); - while ( ! closed() ) { - try { - if (_session.wait_for_alert(libtorrent::milliseconds(100))) { - std::unique_ptr<libtorrent::alert> alert = _session.pop_alert(); - eventHandler.handle(std::move(alert)); - } - } catch (const ZKConnectionLossException & e) { - LOG(info, "Connection loss in downloader event loop, resuming. %s", e.what()); - } catch (const vespalib::PortListenException & e) { - LOG(warning, "Failed listening to torrent port : %s", e.what()); - std::quick_exit(21); - } catch (const boost::filesystem::filesystem_error & e) { - LOG(warning, "Some boost file operations failed : %s", e.what()); - std::quick_exit(22); - } - } - drain(); -} - -bool -FileDownloader::closed() const -{ - return _closed.load(); -} - -void -FileDownloader::close() -{ - _closed.store(true); -} - -void -FileDownloader::signalIfFinishedDownloading(const std::string& fileReference) { - boost::optional<fs::path> path = pathToCompletedFile(fileReference); - - if (path) { - _downloadCompleted(fileReference, *path); - } -} - -std::string -FileDownloader::infoHash2FileReference(const libtorrent::sha1_hash& infoHash) { - //TODO - return fileReferenceToString(infoHash); -} - -namespace { -int -toBytesPerSec(double MBPerSec) { - return static_cast<int>(MBPerSec * 1024 * 1024); -} -} - -void -FileDownloader::setMaxDownloadSpeed(double MBPerSec) { - LOG(config, "Setting max download speed to %f MB/sec", MBPerSec); -#pragma GCC diagnostic ignored "-Wdeprecated-declarations" - _session.set_download_rate_limit(toBytesPerSec(MBPerSec)); -#pragma GCC diagnostic pop -} - -void -FileDownloader::setMaxUploadSpeed(double MBPerSec) { - LOG(config, "Setting max upload speed to %f MB/sec", MBPerSec); -#pragma GCC diagnostic ignored "-Wdeprecated-declarations" - _session.set_upload_rate_limit(toBytesPerSec(MBPerSec)); -#pragma GCC diagnostic pop -} diff --git a/filedistribution/src/vespa/filedistribution/distributor/filedownloader.h b/filedistribution/src/vespa/filedistribution/distributor/filedownloader.h deleted file mode 100644 index 3f608e85e64..00000000000 --- a/filedistribution/src/vespa/filedistribution/distributor/filedownloader.h +++ /dev/null @@ -1,86 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#pragma once - -#include <vector> -#include <mutex> -#include <boost/optional.hpp> - -#include <libtorrent/session.hpp> - -#include <vespa/filedistribution/rpc/fileprovider.h> -#include "hostname.h" -#include <vespa/filedistribution/common/buffer.h> -#include <vespa/filedistribution/common/exception.h> -#include <vespa/filedistribution/model/filedbmodel.h> - -namespace filedistribution { - -VESPA_DEFINE_EXCEPTION(NoSuchTorrentException, vespalib::Exception); - -class FileDownloader -{ - struct EventHandler; - struct LogSessionDeconstructed { - ~LogSessionDeconstructed(); - }; - - std::atomic<size_t> _outstanding_SRD_requests; - std::shared_ptr<FileDistributionTracker> _tracker; - - std::mutex _modifyTorrentsDownloadingMutex; - typedef std::lock_guard<std::mutex> LockGuard; - - LogSessionDeconstructed _logSessionDeconstructed; - //session is safe to use from multiple threads. - libtorrent::session _session; - std::atomic<bool> _closed; - - const Path _dbPath; - typedef std::vector<char> ResumeDataBuffer; - boost::optional<ResumeDataBuffer> getResumeData(const std::string& fileReference); - - class RemoveTorrent; - - void deleteTorrentData(const libtorrent::torrent_handle& torrent, LockGuard&); - void listen(); - bool closed() const; - void drain(); -public: - // accounting of save-resume-data requests: - void didRequestSRD() { ++_outstanding_SRD_requests; } - void didReceiveSRD() { --_outstanding_SRD_requests; } - - typedef FileProvider::DownloadCompletedSignal DownloadCompletedSignal; - typedef FileProvider::DownloadFailedSignal DownloadFailedSignal; - - FileDownloader(const std::shared_ptr<FileDistributionTracker>& tracker, - const std::string& hostName, int port, - const Path& dbPath); - ~FileDownloader(); - DirectoryGuard::UP getGuard() { return std::make_unique<DirectoryGuard>(_dbPath); } - - void runEventLoop(); - void addTorrent(const std::string& fileReference, const Buffer& buffer); - bool hasTorrent(const std::string& fileReference) const; - boost::optional<Path> pathToCompletedFile(const std::string& fileReference) const; - void removeAllTorrentsBut(const std::set<std::string> & filesToRetain); - - void signalIfFinishedDownloading(const std::string& fileReference); - - std::string infoHash2FileReference(const libtorrent::sha1_hash& hash); - void setMaxDownloadSpeed(double MBPerSec); - void setMaxUploadSpeed(double MBPerSec); - void close(); - bool drained() const { return _outstanding_SRD_requests == 0; } - - const std::string _hostName; - const int _port; - - //signals - DownloadCompletedSignal _downloadCompleted; - DownloadFailedSignal _downloadFailed; //removed or error -}; - -} //namespace filedistribution - - diff --git a/filedistribution/src/vespa/filedistribution/distributor/filedownloadermanager.cpp b/filedistribution/src/vespa/filedistribution/distributor/filedownloadermanager.cpp deleted file mode 100644 index 1763b798f03..00000000000 --- a/filedistribution/src/vespa/filedistribution/distributor/filedownloadermanager.cpp +++ /dev/null @@ -1,136 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "filedownloadermanager.h" -#include <thread> - -#include <vespa/log/log.h> -LOG_SETUP(".filedownloadermanager"); - -using namespace std::literals; - -using filedistribution::FileDownloaderManager; -using filedistribution::Path; - -namespace { -void logStartDownload(const std::set<std::string> & filesToDownload) { - std::ostringstream msg; - msg << "StartDownloads:" << std::endl; - std::copy(filesToDownload.begin(), filesToDownload.end(), - std::ostream_iterator<std::string>(msg, "\n")); - LOG(debug, "%s", msg.str().c_str()); -} -} //anonymous namespace - -FileDownloaderManager::FileDownloaderManager( - const std::shared_ptr<FileDownloader>& downloader, - const std::shared_ptr<FileDistributionModel>& model) - - :_fileDownloader(downloader), - _fileDistributionModel(model), - _startDownloads(this), - _setFinishedDownloadingStatus(this) -{} - -FileDownloaderManager::~FileDownloaderManager() { - LOG(debug, "Deconstructing FileDownloaderManager"); -} - -void -FileDownloaderManager::start() -{ - _downloadFailedConnection = downloadFailed().connect( - DownloadFailedSignal::slot_type([&] (const std::string & peer, FileProvider::FailedDownloadReason reason) { (void) reason; removePeerStatus(peer); }).track_foreign(shared_from_this())); - - _downloadCompletedConnection = downloadCompleted().connect( - DownloadCompletedSignal::slot_type(_setFinishedDownloadingStatus).track_foreign(shared_from_this())); - - _filesToDownloadChangedConnection = _fileDistributionModel->_filesToDownloadChanged.connect( - FileDistributionModel::FilesToDownloadChangedSignal::slot_type(std::ref(_startDownloads)).track_foreign(shared_from_this())); -} - -boost::optional< Path > -FileDownloaderManager::getPath(const std::string& fileReference) { - return _fileDownloader->pathToCompletedFile(fileReference); -} - -void -FileDownloaderManager::downloadFile(const std::string& fileReference) { - { - LockGuard updateFilesToDownloadGuard(_updateFilesToDownloadMutex); - _startDownloads.downloadFile(fileReference); - } - - //if the file is already downloading but not completed before the above call, - //the finished download callback might come before the interested party - //has called connectFinishedDownloadingHandler. - //An explicit call is therefore used to mitigate this problem: - //Do not hold updateFilesToDownloadMutex when calling this, as it might cause deadlock. - _fileDownloader->signalIfFinishedDownloading(fileReference); -} - -void -FileDownloaderManager::removePeerStatus(const std::string& fileReference) { - //TODO: Simplify by using separate thread for removal: - //currently called via StartDownloads which already holds a lock on updateFilesToDownloadMutex. - - _fileDistributionModel->removePeer(fileReference); -} - -void -FileDownloaderManager::StartDownloads::downloadFile(const std::string& fileReference) { - if (!_parent._fileDownloader->hasTorrent(fileReference)) { - Buffer torrent(_parent._fileDistributionModel->getFileDBModel().getFile(fileReference)); - - _parent._fileDistributionModel->addPeer(fileReference); - _parent._fileDownloader->addTorrent(fileReference, torrent); - } -} - - -void -FileDownloaderManager::StartDownloads::operator()() { - - DirectoryGuard::UP guard = _parent._fileDownloader->getGuard(); - LockGuard updateFilesToDownloadGuard(_parent._updateFilesToDownloadMutex); - - std::set<std::string> filesToDownload = _parent._fileDistributionModel->getFilesToDownload(); - logStartDownload(filesToDownload); - - std::for_each(filesToDownload.begin(), filesToDownload.end(), - [&] (const std::string& file) { downloadFile(file); }); - - _parent._fileDownloader->removeAllTorrentsBut(filesToDownload); -} - -FileDownloaderManager::StartDownloads::StartDownloads(FileDownloaderManager* parent) - :_parent(*parent) -{} - - -FileDownloaderManager::SetFinishedDownloadingStatus::SetFinishedDownloadingStatus( - FileDownloaderManager* parent) - :_parent(*parent) -{} - -void -FileDownloaderManager::SetFinishedDownloadingStatus::operator()( - const std::string& fileReference, const Path&) { - - //Prevent concurrent modifications to peer node in zk. - LockGuard updateFilesToDownloadGuard(_parent._updateFilesToDownloadMutex); - - try { - _parent._fileDistributionModel->peerFinished(fileReference); - } catch (const NotPeer &) { //Probably a concurrent removal of the torrent. - - //improve chance of libtorrent session being updated. - std::this_thread::sleep_for(100ms); - if (_parent._fileDownloader->hasTorrent(fileReference)) { - - _parent._fileDistributionModel->addPeer(fileReference); - _parent._fileDistributionModel->peerFinished(fileReference); - } else { - LOG(debug, "OK: Torrent '%s' finished concurrently with its removal.", fileReference.c_str()); - } - } -} diff --git a/filedistribution/src/vespa/filedistribution/distributor/filedownloadermanager.h b/filedistribution/src/vespa/filedistribution/distributor/filedownloadermanager.h deleted file mode 100644 index 107685b4e38..00000000000 --- a/filedistribution/src/vespa/filedistribution/distributor/filedownloadermanager.h +++ /dev/null @@ -1,67 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#pragma once - -#include <boost/signals2/signal.hpp> - -#include <vespa/filedistribution/rpc/fileprovider.h> -#include <vespa/filedistribution/model/filedistributionmodel.h> -#include "filedownloader.h" - -namespace filedistribution { - -class FileDownloaderManager : public FileProvider, - public std::enable_shared_from_this<FileDownloaderManager> { - - class StartDownloads { - FileDownloaderManager& _parent; - public: - void operator()(); - void downloadFile(const std::string& fileReference); - StartDownloads(FileDownloaderManager* parent); - }; - - class SetFinishedDownloadingStatus { - FileDownloaderManager& _parent; - public: - void operator()(const std::string& fileReference, const Path&); - SetFinishedDownloadingStatus(FileDownloaderManager*); - }; - - typedef std::lock_guard<std::mutex> LockGuard; - std::mutex _updateFilesToDownloadMutex; - - std::shared_ptr<FileDownloader> _fileDownloader; - std::shared_ptr<FileDistributionModel> _fileDistributionModel; - StartDownloads _startDownloads; - SetFinishedDownloadingStatus _setFinishedDownloadingStatus; - - boost::signals2::scoped_connection _downloadFailedConnection; - boost::signals2::scoped_connection _downloadCompletedConnection; - boost::signals2::scoped_connection _filesToDownloadChangedConnection; - - void removePeerStatus(const std::string& fileReference); -public: - using SP = std::shared_ptr<FileDownloaderManager>; - FileDownloaderManager(const FileDownloaderManager &) = delete; - FileDownloaderManager & operator = (const FileDownloaderManager &) = delete; - FileDownloaderManager(const std::shared_ptr<FileDownloader>&, - const std::shared_ptr<FileDistributionModel>& model); - ~FileDownloaderManager(); - void start(); - - boost::optional<Path> getPath(const std::string& fileReference) override; - void downloadFile(const std::string& fileReference) override; - - //FileProvider overrides - DownloadCompletedSignal& downloadCompleted() override { - return _fileDownloader->_downloadCompleted; - } - - DownloadFailedSignal& downloadFailed() override { - return _fileDownloader->_downloadFailed; - } -}; - -} //namespace filedistribution - - diff --git a/filedistribution/src/vespa/filedistribution/distributor/hostname.cpp b/filedistribution/src/vespa/filedistribution/distributor/hostname.cpp deleted file mode 100644 index 253cc822b3c..00000000000 --- a/filedistribution/src/vespa/filedistribution/distributor/hostname.cpp +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "hostname.h" -#include <vespa/vespalib/util/stringfmt.h> -#include <vespa/vespalib/net/socket_address.h> -#include <vespa/log/log.h> -LOG_SETUP(".hostname"); - -namespace filedistribution { - -std::string -lookupIPAddress(const std::string& hostName) -{ - auto best_addr = vespalib::SocketAddress::select_remote(0, hostName.c_str()); - if (!best_addr.valid()) { - throw filedistribution::FailedResolvingHostName(hostName, VESPA_STRLOC); - } - const std::string address = best_addr.ip_address(); - LOG(debug, "Resolved hostname'%s' as '%s'", hostName.c_str(), address.c_str()); - return address; -} - -VESPA_IMPLEMENT_EXCEPTION(FailedResolvingHostName, vespalib::Exception); - -} diff --git a/filedistribution/src/vespa/filedistribution/distributor/hostname.h b/filedistribution/src/vespa/filedistribution/distributor/hostname.h deleted file mode 100644 index c4aa7c50787..00000000000 --- a/filedistribution/src/vespa/filedistribution/distributor/hostname.h +++ /dev/null @@ -1,13 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#pragma once - -#include <string> -#include <vespa/filedistribution/common/exception.h> - -namespace filedistribution { - -std::string lookupIPAddress(const std::string& hostName); - -VESPA_DEFINE_EXCEPTION(FailedResolvingHostName, vespalib::Exception); - -} diff --git a/filedistribution/src/vespa/filedistribution/distributor/scheduler.cpp b/filedistribution/src/vespa/filedistribution/distributor/scheduler.cpp deleted file mode 100644 index fec2e94dbb2..00000000000 --- a/filedistribution/src/vespa/filedistribution/distributor/scheduler.cpp +++ /dev/null @@ -1,45 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "scheduler.h" - -namespace asio = boost::asio; - -using filedistribution::Scheduler; -typedef Scheduler::Task Task; - -Task::Task(Scheduler& scheduler) - : _timer(scheduler.ioService) -{} - -void -Task::schedule(asio::deadline_timer::duration_type delay) -{ - _timer.expires_from_now(delay); - std::shared_ptr<Task> self = shared_from_this();; - _timer.async_wait([self](const auto & e) { self->handle(e); }); -} - -void -Task::scheduleNow() -{ - schedule(boost::posix_time::seconds(0)); -} - -void -Task::handle(const boost::system::error_code& code) { - if (code != asio::error::operation_aborted) { - doHandle(); - } -} - - -Scheduler::Scheduler(std::function<void (asio::io_service&)> callRun) - :_keepAliveWork(ioService), - _workerThread([&, callRun]() { callRun(ioService); }) -{} - -Scheduler::~Scheduler() { - ioService.stop(); - _workerThread.join(); - ioService.reset(); -} diff --git a/filedistribution/src/vespa/filedistribution/distributor/scheduler.h b/filedistribution/src/vespa/filedistribution/distributor/scheduler.h deleted file mode 100644 index 9ac53656127..00000000000 --- a/filedistribution/src/vespa/filedistribution/distributor/scheduler.h +++ /dev/null @@ -1,46 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#pragma once - -#include <boost/asio/io_service.hpp> -#include <boost/asio/deadline_timer.hpp> -#include <thread> - - -namespace filedistribution { - -class Scheduler { -public: - class Task : public std::enable_shared_from_this<Task> { - boost::asio::deadline_timer _timer; - public: - typedef std::shared_ptr<Task> SP; - - Task(Scheduler& scheduler); - - virtual ~Task() {} - - void schedule(boost::asio::deadline_timer::duration_type delay); - void scheduleNow(); - - void handle(const boost::system::error_code& code); - protected: - virtual void doHandle() = 0; - }; - -private: - boost::asio::io_service ioService; - - //keeps io_service.run() from exiting until it has been destructed, - //see http://www.boost.org/doc/libs/1_42_0/doc/html/boost_asio/reference/io_service.html - boost::asio::io_service::work _keepAliveWork; - std::thread _workerThread; - -public: - Scheduler(const Scheduler &) = delete; - Scheduler & operator = (const Scheduler &) = delete; - Scheduler(std::function<void (boost::asio::io_service&)> callRun) ; - ~Scheduler(); -}; - -} - diff --git a/filedistribution/src/vespa/filedistribution/distributor/signalhandling.cpp b/filedistribution/src/vespa/filedistribution/distributor/signalhandling.cpp deleted file mode 100644 index 35ff9e4e8ca..00000000000 --- a/filedistribution/src/vespa/filedistribution/distributor/signalhandling.cpp +++ /dev/null @@ -1,40 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "signalhandling.h" -#include <vespa/vespalib/util/signalhandler.h> - -#include <vespa/log/log.h> -LOG_SETUP(".signalhandling"); - -typedef vespalib::SignalHandler SIG; - -void -initSignals() { - SIG::PIPE.ignore(); - SIG::INT.hook(); - SIG::TERM.hook(); - SIG::USR1.hook(); -} - -bool -askedToShutDown() { - bool result = SIG::INT.check() || SIG::TERM.check(); - if (result) { - LOG(debug, "Asked to shut down."); - } - return result; -} - -bool -askedToReinitialize() { - bool result = SIG::USR1.check(); - if (result) { - LOG(debug, "Asked to reinitialize."); - } - return result; -} - -void -clearReinitializeFlag() { - SIG::USR1.clear(); -} diff --git a/filedistribution/src/vespa/filedistribution/distributor/signalhandling.h b/filedistribution/src/vespa/filedistribution/distributor/signalhandling.h deleted file mode 100644 index 28713d16aaa..00000000000 --- a/filedistribution/src/vespa/filedistribution/distributor/signalhandling.h +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#pragma once - -void -initSignals(); - -bool -askedToShutDown(); - -bool -askedToReinitialize(); - -void -clearReinitializeFlag(); - diff --git a/filedistribution/src/vespa/filedistribution/distributor/state_server_impl.cpp b/filedistribution/src/vespa/filedistribution/distributor/state_server_impl.cpp deleted file mode 100644 index fbdf8155302..00000000000 --- a/filedistribution/src/vespa/filedistribution/distributor/state_server_impl.cpp +++ /dev/null @@ -1,7 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "state_server_impl.h" - -namespace filedistribution { - -} // namespace filedistribution diff --git a/filedistribution/src/vespa/filedistribution/distributor/state_server_impl.h b/filedistribution/src/vespa/filedistribution/distributor/state_server_impl.h deleted file mode 100644 index 0017995c776..00000000000 --- a/filedistribution/src/vespa/filedistribution/distributor/state_server_impl.h +++ /dev/null @@ -1,21 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#pragma once - -#include <vespa/vespalib/net/state_server.h> -#include <vespa/vespalib/net/simple_metrics_producer.h> -#include <vespa/vespalib/net/simple_health_producer.h> -#include <vespa/vespalib/net/simple_component_config_producer.h> - -namespace filedistribution { - -struct StateServerImpl { - vespalib::SimpleHealthProducer myHealth; - vespalib::SimpleMetricsProducer myMetrics; - vespalib::SimpleComponentConfigProducer myComponents; - vespalib::StateServer myStateServer; - - StateServerImpl(int port) : myStateServer(port, myHealth, myMetrics, myComponents) {} -}; - -} // namespace filedistribution diff --git a/filedistribution/src/vespa/filedistribution/manager/.gitignore b/filedistribution/src/vespa/filedistribution/manager/.gitignore deleted file mode 100644 index b94a4aae8e0..00000000000 --- a/filedistribution/src/vespa/filedistribution/manager/.gitignore +++ /dev/null @@ -1,5 +0,0 @@ -.depend -Makefile -com_yahoo_vespa_filedistribution_*.h -*.So -/libfiledistributionmanager.so.5.1 diff --git a/filedistribution/src/vespa/filedistribution/manager/CMakeLists.txt b/filedistribution/src/vespa/filedistribution/manager/CMakeLists.txt deleted file mode 100644 index b5798b7b72e..00000000000 --- a/filedistribution/src/vespa/filedistribution/manager/CMakeLists.txt +++ /dev/null @@ -1,17 +0,0 @@ -# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -vespa_add_library(filedistribution_filedistributionmanager - SOURCES - createtorrent.cpp - filedb.cpp - stderr_logfwd.cpp - $<TARGET_OBJECTS:filedistribution_filedbmodel> - $<TARGET_OBJECTS:filedistribution_exceptionrethrower> - INSTALL lib64 - OUTPUT_NAME filedistributionmanager - DEPENDS - boost_system${VESPA_BOOST_LIB_SUFFIX} - boost_thread${VESPA_BOOST_LIB_SUFFIX} - boost_filesystem${VESPA_BOOST_LIB_SUFFIX} - zookeeper_mt - ${JAVA_JVM_LIBRARY} -) diff --git a/filedistribution/src/vespa/filedistribution/manager/createtorrent.cpp b/filedistribution/src/vespa/filedistribution/manager/createtorrent.cpp deleted file mode 100644 index 6de773ebd67..00000000000 --- a/filedistribution/src/vespa/filedistribution/manager/createtorrent.cpp +++ /dev/null @@ -1,87 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include "createtorrent.h" - -#include <libtorrent/torrent_info.hpp> -#include <boost/filesystem/convenience.hpp> - -#include <iostream> -#include <fstream> -#include <cmath> -#include <iterator> -#include <sstream> -#include <string> - -namespace fs = boost::filesystem; - -namespace { - -const libtorrent::size_type targetTorrentSize = 64 * 1024; - -void -aggregateFilenames(std::vector<std::string>& accumulator, const fs::path& path, std::string currPrefix) -{ - if (fs::is_directory(path)) { - for (fs::directory_iterator i(path), end; - i != end; - ++i) { - std::string newPrefix = currPrefix + "/" + std::string(i->path().filename().c_str()); - accumulator.push_back(newPrefix); - aggregateFilenames(accumulator, *i, newPrefix); - } - } -} - - -libtorrent::entry -createEntry(const fs::path& path) { - if (!fs::exists(path)) - throw std::runtime_error("Path '" + std::string(path.filename().c_str()) + " does not exists"); - - libtorrent::file_storage fileStorage; - libtorrent::add_files(fileStorage, path.string()); - - libtorrent::create_torrent torrent(fileStorage); - torrent.set_creator("vespa-filedistributor"); - torrent.set_priv(true); - torrent.add_tracker(""); - - libtorrent::set_piece_hashes(torrent, path.branch_path().string()); - return torrent.generate(); -} - -std::string -fileReferenceToString(const libtorrent::sha1_hash& fileReference) { - std::ostringstream fileReferenceString; - fileReferenceString <<fileReference; - return fileReferenceString.str(); -} - -} //anonymous namespace - -filedistribution:: -CreateTorrent:: -CreateTorrent(const Path& path) - :_path(path), - _entry(createEntry(_path)) -{} - -filedistribution::Buffer -filedistribution:: -CreateTorrent:: -bencode() const -{ - Buffer buffer(static_cast<int>(targetTorrentSize)); - libtorrent::bencode(std::back_inserter(buffer), _entry); - return buffer; -} - -const std::string -filedistribution:: -CreateTorrent:: -fileReference() const -{ -#pragma GCC diagnostic ignored "-Wdeprecated-declarations" - libtorrent::torrent_info info(_entry); -#pragma GCC diagnostic pop - return fileReferenceToString(info.info_hash()); -} diff --git a/filedistribution/src/vespa/filedistribution/manager/createtorrent.h b/filedistribution/src/vespa/filedistribution/manager/createtorrent.h deleted file mode 100644 index d84c9ad1d25..00000000000 --- a/filedistribution/src/vespa/filedistribution/manager/createtorrent.h +++ /dev/null @@ -1,23 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#pragma once - -#include <vector> -#include <libtorrent/create_torrent.hpp> - -#include <vespa/filedistribution/common/buffer.h> -#include <vespa/filedistribution/common/exception.h> - -namespace filedistribution { - -class CreateTorrent { - Path _path; - libtorrent::entry _entry; -public: - - CreateTorrent(const Path& path); - Buffer bencode() const; - const std::string fileReference() const; -}; - -} //namespace filedistribution - diff --git a/filedistribution/src/vespa/filedistribution/manager/field.h b/filedistribution/src/vespa/filedistribution/manager/field.h deleted file mode 100644 index 04fbc286c8d..00000000000 --- a/filedistribution/src/vespa/filedistribution/manager/field.h +++ /dev/null @@ -1,46 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#pragma once - -#include <jni.h> - -#include <vespa/filedistribution/common/exception.h> - -namespace filedistribution { - -struct BadFieldException : std::runtime_error { - explicit BadFieldException(const std::string& name) - : runtime_error("Could not lookup field '" + name + "'") - {} - - BadFieldException(const BadFieldException& e) - : runtime_error(e) - {} -}; - -template <class T> -class LongField { - jfieldID _fieldID; -public: - LongField() - {} - - LongField(jclass clazz, const char* fieldName, JNIEnv* env) - :_fieldID(env->GetFieldID(clazz, fieldName, "J")) { - - if (!_fieldID) - BOOST_THROW_EXCEPTION(BadFieldException(fieldName)); - } - - void set(jobject obj, T value, JNIEnv* env) { - jlong longValue = reinterpret_cast<long>(value); - env->SetLongField(obj, _fieldID, longValue); - } - - T get(jobject obj, JNIEnv* env) { - jlong longValue = env->GetLongField(obj, _fieldID); - return reinterpret_cast<T>(longValue); - } -}; - -} //namespace filedistribution - diff --git a/filedistribution/src/vespa/filedistribution/manager/filedb.cpp b/filedistribution/src/vespa/filedistribution/manager/filedb.cpp deleted file mode 100644 index bc725684846..00000000000 --- a/filedistribution/src/vespa/filedistribution/manager/filedb.cpp +++ /dev/null @@ -1,61 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "filedb.h" - -#include <boost/filesystem.hpp> - -namespace fs = boost::filesystem; - -namespace { - -void copyDirectory(fs::path original, fs::path destination) -{ - fs::create_directory(destination); - for (fs::directory_iterator curr(original), end; - curr != end; - ++curr) { - - fs::path destPath = destination / curr->path().filename(); - if ( fs::is_directory(curr->status()) ) { - copyDirectory(*curr, destPath); - } else { - fs::copy_file(*curr, destPath); - } - } -} - -} //anonymous namespace - -namespace filedistribution { - -FileDB::FileDB(fs::path dbPath) - : _dbPath(dbPath) {} - - -bool -FileDB::add(const DirectoryGuard & directoryGuard, fs::path original, const std::string &name) { - (void) directoryGuard; - fs::path finalPath = _dbPath / name; - fs::path targetPath = _dbPath / (name + ".new"); - if (fs::exists(finalPath) || fs::exists(targetPath)) { - return false; - } - fs::path targetPathTemp = _dbPath / (name + ".tmp"); - - if (fs::exists(targetPathTemp)) { - fs::remove_all(targetPathTemp); - } - - fs::create_directory(targetPathTemp); - if (!fs::is_directory(original)) { - fs::copy_file(original, targetPathTemp / original.filename()); - } else { - copyDirectory(original, targetPathTemp / original.filename()); - } - - assert(!fs::exists(targetPath)); - fs::rename(targetPathTemp, targetPath); - return true; -} - -} diff --git a/filedistribution/src/vespa/filedistribution/manager/filedb.h b/filedistribution/src/vespa/filedistribution/manager/filedb.h deleted file mode 100644 index 4ab840a5595..00000000000 --- a/filedistribution/src/vespa/filedistribution/manager/filedb.h +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#pragma once - -#include <string> -#include <vespa/filedistribution/model/filedbmodel.h> - -namespace filedistribution { - -class FileDB { - Path _dbPath; -public: - FileDB(Path dbPath); - DirectoryGuard::UP getGuard() { return std::make_unique<DirectoryGuard>(_dbPath); } - /** - * - * @param directoryGuard The guard you need to hold in order to prevent someone fidling with your directory. - * @param original The file top copy - * @param name The name the file shall have. - * @return true if it was added, false if it was already present. - */ - bool add(const DirectoryGuard & directoryGuard, Path original, const std::string& name); -}; - -} //namespace filedistribution - diff --git a/filedistribution/src/vespa/filedistribution/manager/filedistributionmanager.cpp b/filedistribution/src/vespa/filedistribution/manager/filedistributionmanager.cpp deleted file mode 100644 index f0825343f27..00000000000 --- a/filedistribution/src/vespa/filedistribution/manager/filedistributionmanager.cpp +++ /dev/null @@ -1,188 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "jnistring.h" -#include "field.h" -#include "createtorrent.h" -#include "filedb.h" -#include <vespa/filedistribution/manager/com_yahoo_vespa_filedistribution_FileDistributionManager.h> - -#include <vespa/filedistribution/model/filedistributionmodel.h> -#include <vespa/filedistribution/model/zkfiledbmodel.h> -#include <vespa/filedistribution/model/mockfiledistributionmodel.h> - -using namespace filedistribution; - -namespace fs = boost::filesystem; - -namespace { - -class NativeFileDistributionManager { - public: - std::unique_ptr<FileDBModel> _fileDBModel; - std::unique_ptr<FileDB> _fileDB; -}; - -LongField<NativeFileDistributionManager*> nativeFileDistributionManagerField; - -void throwRuntimeException(const char* msg, JNIEnv* env) -{ - //do not mask active exception. - if (!env->ExceptionOccurred()) { - jclass runtimeExceptionClass = env->FindClass("java/lang/RuntimeException"); - if (runtimeExceptionClass) { - env->ThrowNew(runtimeExceptionClass, msg); - } - } -} - -template <class FIELD> -void deleteField(FIELD& field, jobject self, JNIEnv* env) -{ - delete field.get(self, env); - field.set(self, 0, env); -} - -std::unique_ptr<ZKLogging> _G_zkLogging; - -} //anonymous namespace - - -#define STANDARDCATCH(returnStatement) \ - catch (const std::bad_alloc&) { \ - std::cerr<<"Error: Out of memory" <<std::endl; \ - /*might fail, therefore also uses stderror message*/ \ - throwRuntimeException("Out of memory", env); \ - returnStatement; \ - } catch(const ZKException& e) { \ - std::stringstream ss; \ - ss << "In" << __FUNCTION__ << ": "; \ - ss << e.what(); \ - throwRuntimeException(ss.str().c_str(), env); \ - returnStatement; \ - } catch(const std::exception& e) { \ - throwRuntimeException(e.what(), env); \ - returnStatement; \ - } - -JNIEXPORT -void JNICALL -Java_com_yahoo_vespa_filedistribution_FileDistributionManager_setup( - JNIEnv *env, jclass self) -{ - try { - _G_zkLogging = std::make_unique<ZKLogging>(); - nativeFileDistributionManagerField = LongField<NativeFileDistributionManager*>(self, "nativeFileDistributionManager", env); - } STANDARDCATCH() -} - - -namespace { -void initMockFileDBModel(NativeFileDistributionManager& manager) -{ - manager._fileDBModel.reset(new MockFileDBModel()); -} - -void initFileDBModel(NativeFileDistributionManager& manager, const std::string& zkServers) -{ - //Ignored for now, since we're not installing any watchers. - std::shared_ptr<ZKFacade> zk(new ZKFacade(zkServers, true)); - manager._fileDBModel.reset(new ZKFileDBModel(zk)); -} -} //end anonymous namespace - -JNIEXPORT -void JNICALL -Java_com_yahoo_vespa_filedistribution_FileDistributionManager_init( - JNIEnv *env, jobject self, - jbyteArray fileDBPathArg, jbyteArray zkServersArg) -{ - try { - JNIString zkServers(zkServersArg, env); - - nativeFileDistributionManagerField.set(self, new NativeFileDistributionManager(), env); - NativeFileDistributionManager& manager = *nativeFileDistributionManagerField.get(self, env); - if (zkServers._value == "mockfiledistributionmodel.testing") { - initMockFileDBModel(manager); - } else { - initFileDBModel(manager, zkServers._value); - } - - JNIString fileDBPath(fileDBPathArg, env); - manager._fileDB.reset(new FileDB(fileDBPath._value)); - - } STANDARDCATCH() -} - - -JNIEXPORT -jstring JNICALL -Java_com_yahoo_vespa_filedistribution_FileDistributionManager_addFileImpl( - JNIEnv *env, jobject self, - jbyteArray completePathArg) -{ - try { - JNIString completePath(completePathArg, env); - CreateTorrent createTorrent(completePath._value); - - std::string fileReference = createTorrent.fileReference(); - NativeFileDistributionManager& manager = *nativeFileDistributionManagerField.get(self, env); - - DirectoryGuard::UP guard = manager._fileDB->getGuard();// This prevents the filedistributor from working in an inconsistent state. - bool freshlyAdded = manager._fileDB->add(*guard, completePath._value, fileReference); - - FileDBModel& model = *manager._fileDBModel; - bool hasRegisteredFile = model.hasFile(fileReference); - if (! hasRegisteredFile ) { - model.addFile(fileReference, createTorrent.bencode()); - } - if (freshlyAdded == hasRegisteredFile) { - std::cerr << "freshlyAdded(" << freshlyAdded << ") == hasRegisteredFile(" << hasRegisteredFile - << "), which is very odd. File is '" << fileReference << "'" << std::endl; - } - - //contains string with the characters 0-9 a-f - return env->NewStringUTF(fileReference.c_str()); - } STANDARDCATCH(return 0) -} - - -JNIEXPORT -void JNICALL -Java_com_yahoo_vespa_filedistribution_FileDistributionManager_shutdown( - JNIEnv *env, jobject self) -{ - deleteField(nativeFileDistributionManagerField, self, env); -} - - -JNIEXPORT -void JNICALL -Java_com_yahoo_vespa_filedistribution_FileDistributionManager_setDeployedFilesImpl( - JNIEnv *env, jobject self, jbyteArray hostNameArg, - jbyteArray appIdArg, jobjectArray fileReferencesArg) -{ - try { - JNIString hostName(hostNameArg, env); - JNIString appId(appIdArg, env); - JNIArray<JNIString> fileReferences(fileReferencesArg, env); - - nativeFileDistributionManagerField.get(self, env)->_fileDBModel-> - setDeployedFilesToDownload(hostName._value, appId._value, fileReferences._value); - } STANDARDCATCH() -} - - -JNIEXPORT -void JNICALL -Java_com_yahoo_vespa_filedistribution_FileDistributionManager_removeDeploymentsThatHaveDifferentApplicationIdImpl( - JNIEnv *env, jobject self, jobjectArray hostNamesArg, jbyteArray appIdArg) -{ - try { - JNIArray<JNIString> hostNames(hostNamesArg, env); - JNIString appId(appIdArg, env); - - nativeFileDistributionManagerField.get(self, env)->_fileDBModel-> - removeDeploymentsThatHaveDifferentApplicationId(hostNames._value, appId._value); - } STANDARDCATCH() -} - diff --git a/filedistribution/src/vespa/filedistribution/manager/jnistring.h b/filedistribution/src/vespa/filedistribution/manager/jnistring.h deleted file mode 100644 index fbdb2e88aee..00000000000 --- a/filedistribution/src/vespa/filedistribution/manager/jnistring.h +++ /dev/null @@ -1,78 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#pragma once - -#include <exception> -#include <string> -#include <vector> -#include <jni.h> - -namespace filedistribution { - -class JNIString { - struct Repr { - JNIEnv* _env; - jbyteArray _str; - jint _size; - char* _repr; - - Repr(jbyteArray str, JNIEnv* env) - :_env(env), - _str(str), - _size(_env->GetArrayLength(str)), - _repr(reinterpret_cast<char*>(_env->GetPrimitiveArrayCritical(str, 0))) { - - if (!_repr) - throw std::bad_alloc(); - } - - ~Repr() { - _env->ReleasePrimitiveArrayCritical(_str, _repr, 0); - _env->DeleteLocalRef(_str); - } - }; -public: - typedef jbyteArray JavaValue; - typedef std::string Value; - Value _value; - - JNIString(jbyteArray str, JNIEnv* env) { - Repr repr(str, env); - Value result(repr._repr, repr._repr + repr._size); - _value.swap(result); - } -}; - -class JNIUtf8String { -public: - typedef jstring JavaValue; - typedef std::string Value; - std::string _value; - - JNIUtf8String(jstring str, JNIEnv* env) - :_value(env->GetStringUTFLength(str), 0) - { - env->GetStringUTFRegion(str, 0, _value.begin() - _value.end(), &*_value.begin()); - env->DeleteLocalRef(str); - } -}; - -template <class JNITYPE> -class JNIArray { -public: - std::vector<typename JNITYPE::Value> _value; - - JNIArray(jobjectArray array, JNIEnv* env) { - jsize length = env->GetArrayLength(array); - _value.reserve(length); - - for (jsize i=0; i<length; ++i) { - jobject element = env->GetObjectArrayElement(array, i); - JNITYPE elementValue(static_cast<typename JNITYPE::JavaValue>(element), env); - _value.push_back(elementValue._value); - } - env->DeleteLocalRef(array); - } -}; - -} //namespace filedistribution - diff --git a/filedistribution/src/vespa/filedistribution/manager/stderr_logfwd.cpp b/filedistribution/src/vespa/filedistribution/manager/stderr_logfwd.cpp deleted file mode 100644 index 5273fb562cf..00000000000 --- a/filedistribution/src/vespa/filedistribution/manager/stderr_logfwd.cpp +++ /dev/null @@ -1,24 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include <vespa/filedistribution/common/logfwd.h> - -#include <iostream> -#include <vector> - - -void filedistribution::logfwd::log_forward(LogLevel level, const char* file, int line, const char* fmt, ...) -{ - if (level == debug || level == info) - return; - - const size_t maxSize(0x8000); - std::vector<char> payload(maxSize); - char * buf = &payload[0]; - - va_list args; - va_start(args, fmt); - vsnprintf(buf, maxSize, fmt, args); - va_end(args); - - std::cerr <<"Error: " << buf <<" File: " <<file <<" Line: " <<line <<std::endl; -} diff --git a/filedistribution/src/vespa/filedistribution/model/CMakeLists.txt b/filedistribution/src/vespa/filedistribution/model/CMakeLists.txt deleted file mode 100644 index 51af6b106dc..00000000000 --- a/filedistribution/src/vespa/filedistribution/model/CMakeLists.txt +++ /dev/null @@ -1,18 +0,0 @@ -# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -vespa_add_library(filedistribution_filedbmodel OBJECT - SOURCES - zkfiledbmodel.cpp - zkfacade.cpp - deployedfilestodownload.cpp - DEPENDS -) -vespa_add_library(filedistribution_filedistributionmodel STATIC - SOURCES - deployedfilestodownload.cpp - filedistributionmodelimpl.cpp - zkfacade.cpp - zkfiledbmodel.cpp - DEPENDS -) -vespa_add_target_external_dependency(filedistribution_filedistributionmodel zookeeper_mt) - diff --git a/filedistribution/src/vespa/filedistribution/model/deployedfilestodownload.cpp b/filedistribution/src/vespa/filedistribution/model/deployedfilestodownload.cpp deleted file mode 100644 index 13daecbe5c1..00000000000 --- a/filedistribution/src/vespa/filedistribution/model/deployedfilestodownload.cpp +++ /dev/null @@ -1,149 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "deployedfilestodownload.h" -#include <vespa/filedistribution/common/logfwd.h> - -using filedistribution::DeployedFilesToDownload; -using filedistribution::Path; - -typedef std::vector<std::string> StringVector; - -namespace filedistribution { - -const Path getApplicationIdPath(const Path & parent) { return parent / "appId"; } - -const std::string -readApplicationId(filedistribution::ZKFacade & zk, const Path & deployNode) -{ - if (zk.hasNode(getApplicationIdPath(deployNode))) { - return zk.getString(getApplicationIdPath(deployNode)); - } - return "default:default:default"; -} - -} - -const Path -DeployedFilesToDownload::addNewDeployNode(Path parentPath, const FileReferences& files) { - Path path = parentPath / "deploy_"; - - std::ostringstream filesStream; - if (!files.empty()) { - filesStream << files[0]; - std::for_each(files.begin() +1, files.end(), [&](const auto & v) { filesStream << '\n' << v; }); - } - Path retPath = _zk.createSequenceNode(path, filesStream.str().c_str(), filesStream.str().length()); - return retPath; -} - -void -DeployedFilesToDownload::deleteExpiredDeployNodes(Path parentPath) { - std::map<std::string, StringVector> childrenPerId(groupChildrenByAppId(parentPath, _zk.getChildren(parentPath))); - for (auto & kv : childrenPerId) { - deleteExpiredDeployNodes(parentPath, kv.second); - } -} - -std::map<std::string, StringVector> -DeployedFilesToDownload::groupChildrenByAppId(const Path & parentPath, const StringVector & children) -{ - std::map<std::string, StringVector> childrenById; - std::for_each(std::begin(children), std::end(children), - [&](const std::string & childName) - { - Path childPath = parentPath / childName; - std::string appId(readApplicationId(_zk, childPath)); - childrenById[appId].push_back(childName); - }); - return childrenById; -} - -void -DeployedFilesToDownload::deleteExpiredDeployNodes(Path parentPath, StringVector children) -{ - if (children.size() > numberOfDeploymentsToKeepFilesFrom) { - std::sort(children.begin(), children.end()); - - size_t numberOfNodesToDelete = children.size() - numberOfDeploymentsToKeepFilesFrom; - std::for_each(children.begin(), children.begin() + numberOfNodesToDelete, - [&](const std::string & s) {_zk.remove(parentPath / s); }); - } -} - -void -DeployedFilesToDownload::addAppIdToDeployNode(const Path & deployNode, const std::string & appId) -{ - _zk.setData(getApplicationIdPath(deployNode), appId.c_str(), appId.length()); -} - -void -DeployedFilesToDownload::setDeployedFilesToDownload( - const std::string& hostName, - const std::string& applicationId, - const FileReferences& files) { - Path parentPath = getPath(hostName); - _zk.setData(parentPath, "", 0); - - const Path deployNode(addNewDeployNode(parentPath, files)); - addAppIdToDeployNode(deployNode, applicationId); - deleteExpiredDeployNodes(parentPath); -} - -//Nothrow -template <typename INSERT_ITERATOR> -void -DeployedFilesToDownload::readDeployFile(const Path& path, INSERT_ITERATOR insertionIterator) { - LOGFWD(debug, "Reading deploy file '%s", path.string().c_str()); - - - try { - Buffer buffer(_zk.getData(path)); - std::string stringBuffer(buffer.begin(), buffer.end()); - std::istringstream stream(stringBuffer); - - typedef std::istream_iterator<std::string> iterator; - std::copy(iterator(stream), iterator(), insertionIterator); - } catch (const ZKNodeDoesNotExistsException& e) { - //Node deleted, no problem. - LOGFWD(debug, "Deploy file '%s' deleted.", path.string().c_str()); - } -} - -const DeployedFilesToDownload::FileReferences -DeployedFilesToDownload::getDeployedFilesToDownload( - const std::string& hostName, - const ZKFacade::NodeChangedWatcherSP& watcher) { - - try { - StringVector deployedFiles = _zk.getChildren(getPath(hostName), watcher); - FileReferences fileReferences; - - for (StringVector::iterator i = deployedFiles.begin(); i != deployedFiles.end(); ++i) { - readDeployFile(getPath(hostName) / *i, std::back_inserter(fileReferences)); - } - - return fileReferences; - } catch (const ZKNodeDoesNotExistsException&) { - //Add watch waiting for the node to appear: - if (_zk.hasNode(getPath(hostName), watcher)) { - return getDeployedFilesToDownload(hostName, watcher); - } else { - return FileReferences(); - } - } -} - -const DeployedFilesToDownload::FileReferences -DeployedFilesToDownload::getLatestDeployedFilesToDownload(const std::string& hostName) -{ - StringVector deployedFiles = _zk.getChildren(getPath(hostName)); - std::sort(deployedFiles.begin(), deployedFiles.end()); - - FileReferences fileReferences; - if (deployedFiles.empty()) { - return fileReferences; - } else { - readDeployFile(getPath(hostName) / *deployedFiles.rbegin(), std::back_inserter(fileReferences)); - return fileReferences; - } -} diff --git a/filedistribution/src/vespa/filedistribution/model/deployedfilestodownload.h b/filedistribution/src/vespa/filedistribution/model/deployedfilestodownload.h deleted file mode 100644 index fc726170084..00000000000 --- a/filedistribution/src/vespa/filedistribution/model/deployedfilestodownload.h +++ /dev/null @@ -1,54 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#pragma once - -#include "zkfacade.h" -#include "zkfiledbmodel.h" - -namespace filedistribution { - -const std::string readApplicationId(ZKFacade & zk, const Path & deployNode); - -class DeployedFilesToDownload { - //includes the current deployment. Want 2 * number of config models, since deployment is per model - static const size_t numberOfDeploymentsToKeepFilesFrom = 14; - - ZKFacade& _zk; - - Path getPath(const std::string& hostName) { - return ZKFileDBModel::_hostsPath / hostName; - } - - //Nothrow - template <typename INSERT_ITERATOR> - void readDeployFile(const Path& path, INSERT_ITERATOR insertionIterator); - void addAppIdToDeployNode(const Path & deployNode, const std::string & appId); - std::map<std::string, std::vector<std::string> > groupChildrenByAppId(const Path & parentPath, const std::vector<std::string> & children); - void deleteExpiredDeployNodes(Path parentPath, std::vector<std::string> children); - -public: - typedef std::vector<std::string> FileReferences; - - DeployedFilesToDownload(ZKFacade* zk) - :_zk(*zk) - {} - - const Path addNewDeployNode(Path parentPath, const FileReferences& files); - - void deleteExpiredDeployNodes(Path parentPath); - - void setDeployedFilesToDownload( - const std::string& hostName, - const std::string& applicationId, - const FileReferences& files); - - /** For all the deploys available **/ - const FileReferences getDeployedFilesToDownload( - const std::string& hostName, - const ZKFacade::NodeChangedWatcherSP& watcher); - - /** For the current deploy only **/ - const FileReferences getLatestDeployedFilesToDownload(const std::string& hostName); -}; - -} //namespace filedistribution - diff --git a/filedistribution/src/vespa/filedistribution/model/filedbmodel.h b/filedistribution/src/vespa/filedistribution/model/filedbmodel.h deleted file mode 100644 index c556c703b6d..00000000000 --- a/filedistribution/src/vespa/filedistribution/model/filedbmodel.h +++ /dev/null @@ -1,60 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#pragma once - -#include <vespa/filedistribution/common/buffer.h> -#include <vespa/filedistribution/common/exception.h> - -namespace filedistribution { - -class DirectoryGuard { -public: - typedef std::unique_ptr<DirectoryGuard> UP; - DirectoryGuard(Path path); - ~DirectoryGuard(); -private: - int _fd; -}; - -VESPA_DEFINE_EXCEPTION(InvalidProgressException, vespalib::Exception); -VESPA_DEFINE_EXCEPTION(InvalidHostStatusException, vespalib::Exception); - -class FileDBModel { -public: - struct HostStatus { - enum State { finished, inProgress, notStarted }; - - State _state; - size_t _numFilesToDownload; - size_t _numFilesFinished; - }; - - FileDBModel(const FileDBModel &) = delete; - FileDBModel & operator = (const FileDBModel &) = delete; - FileDBModel() = default; - virtual ~FileDBModel(); - - virtual bool hasFile(const std::string& fileReference) = 0; - virtual void addFile(const std::string& fileReference, const Buffer& buffer) = 0; - virtual Buffer getFile(const std::string& fileReference) = 0; - virtual void cleanFiles(const std::vector<std::string>& filesToPreserve) = 0; - - virtual void setDeployedFilesToDownload(const std::string& hostName, - const std::string & appId, - const std::vector<std::string> & files) = 0; - virtual void cleanDeployedFilesToDownload( - const std::vector<std::string> & hostsToPreserve, - const std::string& appId) = 0; - virtual void removeDeploymentsThatHaveDifferentApplicationId( - const std::vector<std::string> & hostsToPreserve, - const std::string& appId) = 0; - virtual std::vector<std::string> getHosts() = 0; - - virtual HostStatus getHostStatus(const std::string& hostName) = 0; - //TODO: does not really belong here, refactor. - typedef std::vector<int8_t> Progress; // [0-100] - virtual Progress getProgress(const std::string& fileReference, - const std::vector<std::string>& hostsSortedAscending) = 0; -}; - -} //namespace filedistribution - diff --git a/filedistribution/src/vespa/filedistribution/model/filedistributionmodel.h b/filedistribution/src/vespa/filedistribution/model/filedistributionmodel.h deleted file mode 100644 index 1b0ed9f4826..00000000000 --- a/filedistribution/src/vespa/filedistribution/model/filedistributionmodel.h +++ /dev/null @@ -1,44 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#pragma once - -#include <vector> -#include <memory> -#include <string> -#include <set> - -#include <boost/signals2.hpp> - -#include <libtorrent/peer.hpp> -#include <vespa/filedistribution/common/buffer.h> -#include <vespa/filedistribution/common/exception.h> -#include "filedbmodel.h" - -namespace filedistribution { - -VESPA_DEFINE_EXCEPTION(NotPeer, vespalib::Exception); - -class FileDistributionModel { -public: - typedef boost::signals2::signal<void ()> FilesToDownloadChangedSignal; - typedef std::vector<libtorrent::peer_entry> PeerEntries; - - virtual FileDBModel& getFileDBModel() = 0; - - virtual std::set<std::string> getFilesToDownload() = 0; - - virtual PeerEntries getPeers(const std::string& fileReference, size_t maxPeers) = 0; - virtual void addPeer(const std::string& fileReference) = 0; - virtual void removePeer(const std::string& fileReference) = 0; - virtual void peerFinished(const std::string& fileReference) = 0; //throws NotPeer - - FileDistributionModel(const FileDistributionModel &) = delete; - FileDistributionModel & operator = (const FileDistributionModel &) = delete; - FileDistributionModel() = default; - virtual ~FileDistributionModel() {} - - FilesToDownloadChangedSignal _filesToDownloadChanged; -}; - -} //namespace filedistribution - - diff --git a/filedistribution/src/vespa/filedistribution/model/filedistributionmodelimpl.cpp b/filedistribution/src/vespa/filedistribution/model/filedistributionmodelimpl.cpp deleted file mode 100644 index e0338cffd52..00000000000 --- a/filedistribution/src/vespa/filedistribution/model/filedistributionmodelimpl.cpp +++ /dev/null @@ -1,231 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "filedistributionmodel.h" -#include "zkfiledbmodel.h" -#include "deployedfilestodownload.h" -#include "filedistributionmodelimpl.h" -#include <boost/filesystem.hpp> -#include <zookeeper/zookeeper.h> - -#include <vespa/log/log.h> -LOG_SETUP(".filedistributionmodel"); - -namespace fs = boost::filesystem; - -using filedistribution::ZKFileDBModel; -using std::make_shared; - -namespace { -//peer format: hostName:port - -void -addPeerEntry(const std::string& peer, - filedistribution::FileDistributionModelImpl::PeerEntries& result) { - - try { - libtorrent::peer_entry peerEntry; - peerEntry.pid.clear(); - - std::istringstream stream(peer); - stream.exceptions ( std::istream::failbit | std::istream::badbit ); - - std::getline(stream, peerEntry.ip, ZKFileDBModel::_peerEntrySeparator); - stream >> peerEntry.port; - - result.push_back(peerEntry); - } catch (const std::exception&) { - LOG(warning, "Invalid peer entry: '%s'", peer.c_str()); - //Ignore invalid peer entries - } -} - -std::vector<std::string>::iterator -prunePeers(std::vector<std::string> &peers, size_t maxPeers) { - if (peers.size() <= maxPeers) - return peers.end(); - - assert (maxPeers < 2147483648); //due to the usage of rand() - - const size_t peersSize = peers.size(); - for (size_t i=0; i<maxPeers; ++i) { - //i <= i + (std::rand() % ( peersSize -i )) <= peerSize - 1 - size_t candidateBetween_i_and_peerSize = i + ( std::rand() % ( peersSize -i )); - std::swap(peers[i], peers[candidateBetween_i_and_peerSize]); - } - - return peers.begin() + maxPeers; -} - -} //anonymous namespace - -namespace filedistribution { - -VESPA_IMPLEMENT_EXCEPTION(NotPeer, vespalib::Exception); - -} - -using filedistribution::FileDistributionModelImpl; - -struct FileDistributionModelImpl::DeployedFilesChangedCallback : - public ZKFacade::NodeChangedWatcher -{ - typedef std::shared_ptr<DeployedFilesChangedCallback> SP; - - std::weak_ptr<FileDistributionModelImpl> _parent; - - DeployedFilesChangedCallback(const std::shared_ptr<FileDistributionModelImpl> & parent) - :_parent(parent) - {} - - //override - void operator()() override { - if (std::shared_ptr<FileDistributionModelImpl> model = _parent.lock()) { - model->_filesToDownloadChanged(); - } - } -}; - - -FileDistributionModelImpl::~FileDistributionModelImpl() { - LOG(debug, "Deconstructing FileDistributionModelImpl"); -} - -FileDistributionModelImpl::PeerEntries -FileDistributionModelImpl::getPeers(const std::string& fileReference, size_t maxPeers) { - try { - fs::path path = _fileDBModel.getPeersPath(fileReference); - - typedef std::vector<std::string> Peers; - Peers peers = _zk->getChildren(path); - // TODO: Take this port from some config somewhere instead of hardcoding - addConfigServersAsPeers(peers, getenv("services__addr_configserver"), 19093); - - Peers::iterator end = prunePeers(peers, maxPeers); - - PeerEntries result; - result.reserve(end - peers.begin()); - - std::for_each(peers.begin(), end, [&] (const std::string & s) { addPeerEntry(s, result); }); - - LOG(debug, "Found %zu peers for path '%s'", result.size(), path.string().c_str()); - return result; - } catch(ZKNodeDoesNotExistsException&) { - LOG(debug, "No peer entries available for '%s'", fileReference.c_str()); - return PeerEntries(); - } -} - -fs::path -FileDistributionModelImpl::getPeerEntryPath(const std::string& fileReference) { - std::ostringstream entry; - entry <<_hostName << ZKFileDBModel::_peerEntrySeparator <<_port; - - return _fileDBModel.getPeersPath(fileReference) / entry.str(); -} - -void -FileDistributionModelImpl::addPeer(const std::string& fileReference) { - fs::path path = getPeerEntryPath(fileReference); - LOG(debug, "Adding peer '%s'", path.string().c_str()); - - if (_zk->hasNode(path)) { - LOG(info, "Retiring previous peer node owner."); - _zk->removeIfExists(path); - } - _zk->addEphemeralNode(path); -} - -void -FileDistributionModelImpl::removePeer(const std::string& fileReference) { - fs::path path = getPeerEntryPath(fileReference); - LOG(debug, "Removing peer '%s'", path.string().c_str()); - - _zk->removeIfExists(path); -} - -//Assumes that addPeer has been called before the torrent was started, -//so that we avoid the race condition between finishing downloading a torrent -//and setting peer status -void -FileDistributionModelImpl::peerFinished(const std::string& fileReference) { - fs::path path = getPeerEntryPath(fileReference); - LOG(debug, "Peer finished '%s'", path.string().c_str()); - - try { - bool mustExist = true; - char progress = 100; //percent - - _zk->setData(path, &progress, sizeof(char), mustExist); - } catch(ZKNodeDoesNotExistsException & e) { - NotPeer(fileReference, e, VESPA_STRLOC); - } -} - -std::set<std::string> -FileDistributionModelImpl::getFilesToDownload() { - DeployedFilesToDownload d(_zk.get()); - std::vector<std::string> deployed = d.getDeployedFilesToDownload(_hostName, - make_shared<DeployedFilesChangedCallback>(shared_from_this())); - - std::set<std::string> result(deployed.begin(), deployed.end()); - - { - LockGuard guard(_activeFileReferencesMutex); - result.insert(_activeFileReferences.begin(), _activeFileReferences.end()); - } - return result; -} - -bool -FileDistributionModelImpl::updateActiveFileReferences( - const std::vector<vespalib::string>& fileReferences) { - - std::vector<vespalib::string> sortedFileReferences(fileReferences); - std::sort(sortedFileReferences.begin(), sortedFileReferences.end()); - - LockGuard guard(_activeFileReferencesMutex); - bool changed = sortedFileReferences != _activeFileReferences; - - sortedFileReferences.swap(_activeFileReferences); - return changed; -} - -void -FileDistributionModelImpl::addConfigServersAsPeers( - std::vector<std::string> & peers, char const* envConfigServers, int port) { - - std::set<std::string> peersFromTracker(peers.begin(), peers.end()); - - if (envConfigServers == NULL) { - // Could be standalone cluster (not set for this). - return; - } - std::string configServerCommaListed(envConfigServers); - std::stringstream configserverstream(configServerCommaListed); - std::string configserver; - while (std::getline(configserverstream, configserver, ',')) { - configserver += ":" + std::to_string(port); - if (peersFromTracker.find(configserver) == peersFromTracker.end()) { - peers.push_back(configserver); - LOG(debug, "Adding configserver '%s'", configserver.c_str()); - } else { - LOG(debug, "Configserver already added '%s'", configserver.c_str()); - } - } -} - -void -FileDistributionModelImpl::configure(std::unique_ptr<FilereferencesConfig> config) { - const bool changed = updateActiveFileReferences(config->filereferences); - if (changed) { - try { - _filesToDownloadChanged(); - } catch (const ZKConnectionLossException & e) { - LOG(info, "Connection loss in reconfigure of file references, resuming. %s", e.what()); - } catch (const ZKOperationTimeoutException & e) { - LOG(warning, "Operation timed out in reconfigure of file references. " - "Will do quick exit to start a clean sheet. %s", e.what()); - std::quick_exit(41); - } - } -} diff --git a/filedistribution/src/vespa/filedistribution/model/filedistributionmodelimpl.h b/filedistribution/src/vespa/filedistribution/model/filedistributionmodelimpl.h deleted file mode 100644 index 5374040a7f1..00000000000 --- a/filedistribution/src/vespa/filedistribution/model/filedistributionmodelimpl.h +++ /dev/null @@ -1,68 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#pragma once - -#include "filedistributionmodel.h" -#include <vespa/config-filereferences.h> -#include "zkfacade.h" -#include "zkfiledbmodel.h" -#include <vespa/config/config.h> - -using cloud::config::filedistribution::FilereferencesConfig; - -namespace filedistribution { - -class FileDistributionModelImpl : public FileDistributionModel, - public config::IFetcherCallback<FilereferencesConfig>, - public std::enable_shared_from_this<FileDistributionModelImpl> -{ - struct DeployedFilesChangedCallback; - - const std::string _hostName; - const int _port; - - const std::shared_ptr<ZKFacade> _zk; - ZKFileDBModel _fileDBModel; - - std::mutex _activeFileReferencesMutex; - typedef std::lock_guard<std::mutex> LockGuard; - std::vector<vespalib::string> _activeFileReferences; - - bool /*changed*/ - updateActiveFileReferences(const std::vector<vespalib::string>& fileReferences); - - Path getPeerEntryPath(const std::string& fileReference); -public: - FileDistributionModelImpl(const std::string& hostName, int port, const std::shared_ptr<ZKFacade>& zk) - :_hostName(hostName), - _port(port), - _zk(zk), - _fileDBModel(_zk) - { - /* Hack: Force the first call to updateActiveFileReferences to return changed=true - when the file references config is empty. - This ensures that the "deployed files to download" nodes in zookeeper are read at start up. - */ - _activeFileReferences.push_back("force-initial-files-to-download-changed-signal"); - } - - ~FileDistributionModelImpl(); - - //overrides FileDistributionModel - FileDBModel& getFileDBModel() override { - return _fileDBModel; - } - - std::set<std::string> getFilesToDownload() override; - - PeerEntries getPeers(const std::string& fileReference, size_t maxPeers) override; - void addPeer(const std::string& fileReference) override; - void removePeer(const std::string& fileReference) override; - void peerFinished(const std::string& fileReference) override; - void addConfigServersAsPeers(std::vector<std::string>& peers, char const* envConfigServer, int port); - - //Overrides Subscriber - void configure(std::unique_ptr<FilereferencesConfig> config) override; -}; - -} //namespace filedistribution - diff --git a/filedistribution/src/vespa/filedistribution/model/mockfiledistributionmodel.h b/filedistribution/src/vespa/filedistribution/model/mockfiledistributionmodel.h deleted file mode 100644 index 70f9472caab..00000000000 --- a/filedistribution/src/vespa/filedistribution/model/mockfiledistributionmodel.h +++ /dev/null @@ -1,53 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#pragma once - -#include "filedistributionmodel.h" -#include <algorithm> -#include <vector> - -namespace filedistribution { - -class MockFileDBModel : public FileDBModel { - std::vector<std::string> _fileReferences; -public: - bool hasFile(const std::string& fileReference) override { - return std::find(_fileReferences.begin(), _fileReferences.end(), fileReference) != _fileReferences.end(); - } - - void addFile(const std::string& fileReference, const Buffer & buffer) override { - (void)buffer; - _fileReferences.push_back(fileReference); - } - - Buffer getFile(const std::string& fileReference) override { - (void)fileReference; - const char* resultStr = "result"; - Buffer result(resultStr, resultStr + strlen(resultStr)); - return result; - } - - void cleanFiles(const std::vector<std::string> &) override {} - - void setDeployedFilesToDownload(const std::string&, const std::string&, - const std::vector<std::string> &) override {} - void cleanDeployedFilesToDownload(const std::vector<std::string> &, - const std::string&) override {} - void removeDeploymentsThatHaveDifferentApplicationId(const std::vector<std::string> &, - const std::string&) override {} - - std::vector<std::string> getHosts() override { - return std::vector<std::string>(); - } - - HostStatus getHostStatus(const std::string&) override { - return HostStatus(); - } - - Progress getProgress(const std::string&, const std::vector<std::string>&) override { - return Progress(); - } -}; - - -} //namespace filedistribution - diff --git a/filedistribution/src/vespa/filedistribution/model/zkfacade.cpp b/filedistribution/src/vespa/filedistribution/model/zkfacade.cpp deleted file mode 100644 index de410289ec0..00000000000 --- a/filedistribution/src/vespa/filedistribution/model/zkfacade.cpp +++ /dev/null @@ -1,605 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "zkfacade.h" -#include <vespa/vespalib/net/socket_address.h> -#include <vespa/filedistribution/common/logfwd.h> -#include <vespa/defaults.h> -#include <vespa/vespalib/util/gate.h> -#include <vespa/vespalib/text/stringtokenizer.h> -#include <zookeeper/zookeeper.h> -#include <sstream> -#include <thread> -#include <boost/function_output_iterator.hpp> - -typedef std::unique_lock<std::mutex> UniqueLock; - -using filedistribution::ZKFacade; -using filedistribution::Buffer; -using filedistribution::ZKGenericException; -using filedistribution::ZKException; -using filedistribution::ZKLogging; -using filedistribution::Path; - -namespace { - -std::string -toErrorMsg(int zkStatus) { - switch(zkStatus) { - //System errors - case ZRUNTIMEINCONSISTENCY: - return "Zookeeper: A runtime inconsistency was found(ZRUNTIMEINCONSISTENCY)"; - case ZDATAINCONSISTENCY: - return "Zookeeper: A data inconsistency was found(ZDATAINCONSISTENCY)"; - case ZCONNECTIONLOSS: - return "Zookeeper: Connection to the server has been lost(ZCONNECTIONLOSS)"; - case ZMARSHALLINGERROR: - return "Zookeeper: Error while marshalling or unmarshalling data(ZMARSHALLINGERROR)"; - case ZUNIMPLEMENTED: - return "Zookeeper: Operation is unimplemented(ZUNIMPLEMENTED)"; - case ZOPERATIONTIMEOUT: - return "Zookeeper: Operation timeout(ZOPERATIONTIMEOUT)"; - case ZBADARGUMENTS: - return "Zookeeper: Invalid arguments(ZBADARGUMENTS)"; - case ZINVALIDSTATE: - return "Zookeeper: The connection with the zookeeper servers timed out(ZINVALIDSTATE)."; - - //API errors - case ZNONODE: - return "Zookeeper: Node does not exist(ZNONODE)"; - case ZNOAUTH: - return "Zookeeper: Not authenticated(ZNOAUTH)"; - case ZBADVERSION: - return "Zookeeper: Version conflict(ZBADVERSION)"; - case ZNOCHILDRENFOREPHEMERALS: - return "Zookeeper: Ephemeral nodes may not have children(ZNOCHILDRENFOREPHEMERALS)"; - case ZNODEEXISTS: - return "Zookeeper: The node already exists(ZNODEEXISTS)"; - case ZNOTEMPTY: - return "Zookeeper: The node has children(ZNOTEMPTY)"; - case ZSESSIONEXPIRED: - return "Zookeeper: The session has been expired by the server(ZSESSIONEXPIRED)"; - case ZINVALIDCALLBACK: - return "Zookeeper: Invalid callback specified(ZINVALIDCALLBACK)"; - case ZINVALIDACL: - return "Zookeeper: Invalid ACL specified(ZINVALIDACL)"; - case ZAUTHFAILED: - return "Zookeeper: Client authentication failed(ZAUTHFAILED)"; - case ZCLOSING: - return "Zookeeper: ZooKeeper is closing(ZCLOSING)"; - case ZNOTHING: - return "Zookeeper: No server responses to process(ZNOTHING)"; - default: - LOGFWD(error, "In ZKGenericException::what(): Invalid error code %d", zkStatus); - return "Zookeeper: Invalid error code."; - } -} - -class RetryController { - unsigned int _retryCount; - ZKFacade& _zkFacade; - - static const unsigned int _maxRetries = 10; -public: - int _lastStatus; - - RetryController(ZKFacade* zkFacade) - :_retryCount(0), - _zkFacade(*zkFacade), - _lastStatus(0) - {} - - void operator()(int status) { - _lastStatus = status; - } - - bool shouldRetry() { - ++_retryCount; - - return _zkFacade.retriesEnabled() && - _lastStatus != ZOK && - _retryCount < _maxRetries && - isNonLastingError(_lastStatus) && - pause(); - } - - bool isNonLastingError(int error) { - return error == ZCONNECTIONLOSS || - error == ZOPERATIONTIMEOUT; - } - - bool pause() { - unsigned int sleepInSeconds = 1; - sleep(sleepInSeconds); - LOGFWD(info, "Retrying zookeeper operation."); - return true; - } - - void throwIfError(const Path & path) { - namespace fd = filedistribution; - - switch (_lastStatus) { - case ZSESSIONEXPIRED: - throw fd::ZKSessionExpired(path.string(), VESPA_STRLOC); - case ZNONODE: - throw fd::ZKNodeDoesNotExistsException(path.string(), VESPA_STRLOC); - case ZNODEEXISTS: - throw fd::ZKNodeExistsException(path.string(), VESPA_STRLOC); - case ZCONNECTIONLOSS: - throw fd::ZKConnectionLossException(path.string(), VESPA_STRLOC); - case ZOPERATIONTIMEOUT: - throw fd::ZKOperationTimeoutException(path.string(), VESPA_STRLOC); - default: - if (_lastStatus != ZOK) { - throw fd::ZKGenericException(_lastStatus, toErrorMsg(_lastStatus) + " : " + path.string(), VESPA_STRLOC); - } - } - } -}; - -class DeallocateZKStringVectorGuard { - String_vector& _strings; -public: - DeallocateZKStringVectorGuard(String_vector& strings) - :_strings(strings) - {} - - ~DeallocateZKStringVectorGuard() { - deallocate_String_vector(&_strings); - } -}; - -const Path -setDataForNewFile(ZKFacade& zk, const Path& path, const char* buffer, int length, zhandle_t* zhandle, int createFlags) { - - RetryController retryController(&zk); - const int maxPath = 1024; - char createdPath[maxPath]; - do { - retryController( zoo_create(zhandle, path.string().c_str(), buffer, length, &ZOO_OPEN_ACL_UNSAFE, createFlags, createdPath, maxPath)); - } while (retryController.shouldRetry()); - Path newPath(createdPath); - retryController.throwIfError(newPath); - return newPath; -} - -void -setDataForExistingFile(ZKFacade& zk, const Path& path, const char* buffer, int length, zhandle_t* zhandle) { - RetryController retryController(&zk); - - const int ignoreVersion = -1; - do { - retryController(zoo_set(zhandle, path.string().c_str(), buffer, length, ignoreVersion)); - } while (retryController.shouldRetry()); - - retryController.throwIfError(path); -} - -} //anonymous namespace - -namespace filedistribution { - -VESPA_IMPLEMENT_EXCEPTION(ZKNodeDoesNotExistsException, ZKException); -VESPA_IMPLEMENT_EXCEPTION(ZKConnectionLossException, ZKException); -VESPA_IMPLEMENT_EXCEPTION(ZKNodeExistsException, ZKException); -VESPA_IMPLEMENT_EXCEPTION(ZKFailedConnecting, ZKException); -VESPA_IMPLEMENT_EXCEPTION(ZKSessionExpired, ZKException); -VESPA_IMPLEMENT_EXCEPTION(ZKOperationTimeoutException, ZKException); -VESPA_IMPLEMENT_EXCEPTION_SPINE(ZKGenericException); - -} - -/********** Active watchers *******************************************/ -struct ZKFacade::ZKWatcher { - const std::weak_ptr<ZKFacade> _owner; - const NodeChangedWatcherSP _nodeChangedWatcher; - - ZKWatcher( - const std::shared_ptr<ZKFacade> &owner, - const NodeChangedWatcherSP& nodeChangedWatcher ) - :_owner(owner), - _nodeChangedWatcher(nodeChangedWatcher) - {} - - static void watcherFn(zhandle_t *zh, int type, - int state, const char *path,void *watcherContext) { - - (void)zh; - (void)state; - (void)path; - - if (type == ZOO_SESSION_EVENT) { - //The session events do not cause unregistering of the watcher - //inside zookeeper, so don't unregister it here. - LOGFWD(debug, "ZKWatcher recieved session event with state '%d'. Ignoring", state); - return; - } - - LOGFWD(debug, "ZKWatcher: Begin watcher called for path '%s' with type %d.", path, type); - - ZKWatcher* self = static_cast<ZKWatcher*>(watcherContext); - - //WARNING: Since we're creating a shared_ptr to ZKFacade here, this might cause - //destruction of the ZKFacade in a zookeeper thread. - //Since zookeeper_close blocks until all watcher threads are finished, and we're inside a watcher thread, - //this will cause infinite waiting. - //To avoid this, a custom shared_ptr deleter using a separate deleter thread must be used. - - if (std::shared_ptr<ZKFacade> zk = self->_owner.lock()) { - zk->invokeWatcher(watcherContext); - } - - LOGFWD(debug, "ZKWatcher: End watcher called for path '%s' with type %d.", path, type); - } -}; - -void -ZKFacade::stateWatchingFun(zhandle_t*, int type, int state, const char* path, void* context) { - (void)context; - - //The ZKFacade won't expire before zookeeper_close has finished. - try { - if (type == ZOO_SESSION_EVENT) { - LOGFWD(debug, "Zookeeper session event: %d", state); - if (state == ZOO_EXPIRED_SESSION_STATE) { - throw ZKSessionExpired(path, VESPA_STRLOC); - } else if (state == ZOO_AUTH_FAILED_STATE) { - throw ZKGenericException(ZNOAUTH, path, VESPA_STRLOC); - } - } else { - LOGFWD(info, "State watching function: Unexpected event: '%d' -- '%d' ", type, state); - } - } catch (ZKSessionExpired & e) { - LOGFWD(error, "Received ZKSessionExpired exception that I can not handle. Will just exit quietly : %s", e.what()); - std::quick_exit(11); - } -} - - -void* /* watcherContext */ -ZKFacade::registerWatcher(const NodeChangedWatcherSP& watcher) { - - UniqueLock lock(_watchersMutex); - std::shared_ptr<ZKWatcher> zkWatcher(new ZKWatcher(shared_from_this(), watcher)); - _watchers[zkWatcher.get()] = zkWatcher; - return zkWatcher.get(); -} - - -std::shared_ptr<ZKFacade::ZKWatcher> -ZKFacade::unregisterWatcher(void* watcherContext) { - UniqueLock lock(_watchersMutex); - - WatchersMap::iterator i = _watchers.find(watcherContext); - if (i == _watchers.end()) { - return std::shared_ptr<ZKWatcher>(); - } else { - std::shared_ptr<ZKWatcher> result = i->second; - _watchers.erase(i); - return result; - } -} - -void -ZKFacade::invokeWatcher(void* watcherContext) { - std::shared_ptr<ZKWatcher> watcher = unregisterWatcher(watcherContext); - - if (!_watchersEnabled) - return; - - if (watcher) { - try { - (*watcher->_nodeChangedWatcher)(); - } catch (const ZKConnectionLossException & e) { - LOGFWD(error, "Got connection loss exception while invoking watcher : %s", e.what()); - std::quick_exit(12); - } - } else { - LOGFWD(error, "Invoke called on expired watcher."); - } -} - -/********** End live watchers ***************************************/ - -std::string -ZKFacade::getValidZKServers(const std::string &input, bool ignoreDNSFailure) { - if (ignoreDNSFailure) { - vespalib::StringTokenizer tokenizer(input, ","); - vespalib::string validServers; - for (vespalib::string spec : tokenizer) { - vespalib::StringTokenizer addrTokenizer(spec, ":"); - vespalib::string address = addrTokenizer[0]; - vespalib::string port = addrTokenizer[1]; - if ( !vespalib::SocketAddress::resolve(atoi(port.c_str()), address.c_str()).empty()) { - if ( !validServers.empty() ) { - validServers += ','; - } - validServers += spec; - } - } - return validServers; - } - return input; -} - - -ZKFacade::ZKFacade(const std::string& zkservers, bool allowDNSFailure) - :_retriesEnabled(true), - _watchersEnabled(true), - _zhandle(zookeeper_init(getValidZKServers(zkservers, allowDNSFailure).c_str(), - &ZKFacade::stateWatchingFun, - _zkSessionTimeOut, - 0, //clientid, - this, //context, - 0)) //flags -{ - if (!_zhandle) { - throw ZKFailedConnecting("No zhandle", VESPA_STRLOC); - } -} - -ZKFacade::~ZKFacade() { - disableRetries(); - _watchersEnabled = false; - vespalib::Gate done; - std::thread closer([&done, zhandle=_zhandle] () { zookeeper_close(zhandle); done.countDown(); }); - if ( done.await(50*1000) ) { - LOGFWD(debug, "Zookeeper connection closed successfully."); - } else { - LOGFWD(error, "Not able to close down zookeeper. Dumping core so you can figure out what is wrong"); - abort(); - } - closer.join(); -} - -const std::string -ZKFacade::getString(const Path& path) { - Buffer buffer(getData(path)); - return std::string(buffer.begin(), buffer.end()); -} - -Buffer -ZKFacade::getData(const Path& path) { - RetryController retryController(this); - Buffer buffer(_maxDataSize); - int bufferSize = _maxDataSize; - - const int watchIsOff = 0; - do { - Stat stat; - bufferSize = _maxDataSize; - - retryController( zoo_get(_zhandle, path.string().c_str(), watchIsOff, &*buffer.begin(), &bufferSize, &stat)); - } while(retryController.shouldRetry()); - - retryController.throwIfError(path); - buffer.resize(bufferSize); - return buffer; -} - -Buffer -ZKFacade::getData(const Path& path, const NodeChangedWatcherSP& watcher) { - RegistrationGuard unregisterGuard(*this, watcher); - void* watcherContext = unregisterGuard.get(); - RetryController retryController(this); - - Buffer buffer(_maxDataSize); - int bufferSize = _maxDataSize; - - do { - Stat stat; - bufferSize = _maxDataSize; - - retryController( zoo_wget(_zhandle, path.string().c_str(), &ZKWatcher::watcherFn, watcherContext, &*buffer.begin(), &bufferSize, &stat)); - } while (retryController.shouldRetry()); - - retryController.throwIfError(path); - - buffer.resize(bufferSize); - unregisterGuard.release(); - return buffer; -} - -void -ZKFacade::setData(const Path& path, const Buffer& buffer, bool mustExist) { - return setData(path, &*buffer.begin(), buffer.size(), mustExist); -} - -void -ZKFacade::setData(const Path& path, const char* buffer, size_t length, bool mustExist) { - assert (length < _maxDataSize); - - if (mustExist || hasNode(path)) { - setDataForExistingFile(*this, path, buffer, length, _zhandle); - } else { - setDataForNewFile(*this, path, buffer, length, _zhandle, 0); - } -} - -const Path -ZKFacade::createSequenceNode(const Path& path, const char* buffer, size_t length) { - assert (length < _maxDataSize); - - int createFlags = ZOO_SEQUENCE; - return setDataForNewFile(*this, path, buffer, length, _zhandle, createFlags); -} - -bool -ZKFacade::hasNode(const Path& path) { - RetryController retryController(this); - do { - Stat stat; - const int noWatch = 0; - retryController( zoo_exists(_zhandle, path.string().c_str(), noWatch, &stat)); - } while(retryController.shouldRetry()); - - switch(retryController._lastStatus) { - case ZNONODE: - return false; - case ZOK: - return true; - default: - retryController.throwIfError(path); - //this should never happen: - assert(false); - return false; - } -} - -bool -ZKFacade::hasNode(const Path& path, const NodeChangedWatcherSP& watcher) { - RegistrationGuard unregisterGuard(*this, watcher); - void* watcherContext = unregisterGuard.get(); - RetryController retryController(this); - do { - Stat stat; - retryController(zoo_wexists(_zhandle, path.string().c_str(), &ZKWatcher::watcherFn, watcherContext, &stat)); - } while (retryController.shouldRetry()); - - bool retval(false); - switch(retryController._lastStatus) { - case ZNONODE: - retval = false; - break; - case ZOK: - retval = true; - break; - default: - retryController.throwIfError(path); - //this should never happen: - assert(false); - retval = false; - break; - } - unregisterGuard.release(); - return retval; -} - -void -ZKFacade::addEphemeralNode(const Path& path) { - try { - setDataForNewFile(*this, path, "", 0, _zhandle, ZOO_EPHEMERAL); - } catch(const ZKNodeExistsException& e) { - remove(path); - addEphemeralNode(path); - } -} - -void -ZKFacade::remove(const Path& path) { - std::vector< std::string > children = getChildren(path); - if (!children.empty()) { - std::for_each(children.begin(), children.end(), [&](const std::string & s){ remove(path / s); }); - } - - RetryController retryController(this); - do { - int ignoreVersion = -1; - retryController( zoo_delete(_zhandle, path.string().c_str(), ignoreVersion)); - } while (retryController.shouldRetry()); - - if (retryController._lastStatus != ZNONODE) { - retryController.throwIfError(path); - } -} - -void -ZKFacade::removeIfExists(const Path& path) { - try { - if (hasNode(path)) { - remove(path); - } - } catch (const ZKNodeDoesNotExistsException& e) { - //someone else removed it concurrently, not a problem. - } -} - -void -ZKFacade::retainOnly(const Path& path, const std::vector<std::string>& childrenToPreserve) { - typedef std::vector<std::string> Children; - - Children current = getChildren(path); - std::sort(current.begin(), current.end()); - - Children toPreserveSorted(childrenToPreserve); - std::sort(toPreserveSorted.begin(), toPreserveSorted.end()); - - std::set_difference(current.begin(), current.end(), - toPreserveSorted.begin(), toPreserveSorted.end(), - boost::make_function_output_iterator([&](const std::string & s){ remove(path / s); })); -} - -std::vector< std::string > -ZKFacade::getChildren(const Path& path) { - RetryController retryController(this); - String_vector children; - do { - const bool watch = false; - retryController( zoo_get_children(_zhandle, path.string().c_str(), watch, &children)); - } while (retryController.shouldRetry()); - - retryController.throwIfError(path); - - DeallocateZKStringVectorGuard deallocateGuard(children); - - typedef std::vector<std::string> ResultType; - ResultType result; - result.reserve(children.count); - - std::copy(children.data, children.data + children.count, std::back_inserter(result)); - - return result; -} - -std::vector< std::string > -ZKFacade::getChildren(const Path& path, const NodeChangedWatcherSP& watcher) { - RegistrationGuard unregisterGuard(*this, watcher); - void* watcherContext = unregisterGuard.get(); - - RetryController retryController(this); - String_vector children; - do { - retryController( zoo_wget_children(_zhandle, path.string().c_str(), &ZKWatcher::watcherFn, watcherContext, &children)); - } while (retryController.shouldRetry()); - - retryController.throwIfError(path); - - DeallocateZKStringVectorGuard deallocateGuard(children); - - typedef std::vector<std::string> ResultType; - ResultType result; - result.reserve(children.count); - - std::copy(children.data, children.data + children.count, std::back_inserter(result)); - - unregisterGuard.release(); - return result; -} - - -void -ZKFacade::disableRetries() { - _retriesEnabled = false; -} - -ZKLogging::ZKLogging() : - _file(nullptr) -{ - std::string filename = vespa::Defaults::underVespaHome("tmp/zookeeper.log"); - _file = std::fopen(filename.c_str(), "w"); - if (_file == nullptr) { - LOGFWD(error, "Could not open file '%s'", filename.c_str()); - } else { - zoo_set_log_stream(_file); - } - - zoo_set_debug_level(ZOO_LOG_LEVEL_ERROR); -} - -ZKLogging::~ZKLogging() -{ - zoo_set_log_stream(nullptr); - if (_file != nullptr) { - std::fclose(_file); - _file = nullptr; - } -} diff --git a/filedistribution/src/vespa/filedistribution/model/zkfacade.h b/filedistribution/src/vespa/filedistribution/model/zkfacade.h deleted file mode 100644 index f2451d8191b..00000000000 --- a/filedistribution/src/vespa/filedistribution/model/zkfacade.h +++ /dev/null @@ -1,147 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#pragma once - -#include <string> -#include <vector> -#include <map> -#include <mutex> - -#include <vespa/filedistribution/common/buffer.h> -#include <vespa/filedistribution/common/exception.h> -#include <vespa/vespalib/util/exception.h> - -struct _zhandle; -typedef _zhandle zhandle_t; - -namespace filedistribution { - -class ZKException : public vespalib::Exception { -protected: - using vespalib::Exception::Exception; -}; - -VESPA_DEFINE_EXCEPTION(ZKNodeDoesNotExistsException, ZKException); -VESPA_DEFINE_EXCEPTION(ZKConnectionLossException, ZKException); -VESPA_DEFINE_EXCEPTION(ZKNodeExistsException, ZKException); -VESPA_DEFINE_EXCEPTION(ZKFailedConnecting, ZKException); -VESPA_DEFINE_EXCEPTION(ZKOperationTimeoutException, ZKException); -VESPA_DEFINE_EXCEPTION(ZKSessionExpired, ZKException); - -class ZKGenericException : public ZKException { -public: - ZKGenericException(int zkStatus, const vespalib::stringref &msg, const vespalib::stringref &location = "", int skipStack = 0) : - ZKException(msg, location, skipStack), - _zkStatus(zkStatus) - { } - ZKGenericException(int zkStatus, const vespalib::Exception &cause, const vespalib::stringref &msg = "", - const vespalib::stringref &location = "", int skipStack = 0) : - ZKException(msg, cause, location, skipStack), - _zkStatus(zkStatus) - { } - VESPA_DEFINE_EXCEPTION_SPINE(ZKGenericException); -private: - const int _zkStatus; -}; - -class ZKFacade : public std::enable_shared_from_this<ZKFacade> { - volatile bool _retriesEnabled; - volatile bool _watchersEnabled; - - zhandle_t* _zhandle; - const static int _zkSessionTimeOut = 30 * 1000; - const static size_t _maxDataSize = 1024 * 1024; - - class ZKWatcher; - static void stateWatchingFun(zhandle_t*, int type, int state, const char* path, void* context); -public: - typedef std::shared_ptr<ZKFacade> SP; - - /* Lifetime is managed by ZKFacade. - Derived classes should only contain weak_ptrs to other objects - to avoid linking their lifetime to the ZKFacade lifetime. - */ - class NodeChangedWatcher { - public: - NodeChangedWatcher(const NodeChangedWatcher &) = delete; - NodeChangedWatcher & operator = (const NodeChangedWatcher &) = delete; - NodeChangedWatcher() = default; - virtual ~NodeChangedWatcher() {}; - virtual void operator()() = 0; - }; - - typedef std::shared_ptr<NodeChangedWatcher> NodeChangedWatcherSP; - - ZKFacade(const ZKFacade &) = delete; - ZKFacade & operator = (const ZKFacade &) = delete; - ZKFacade(const std::string& zkservers, bool allowDNSFailure); - ~ZKFacade(); - - bool hasNode(const Path&); - bool hasNode(const Path&, const NodeChangedWatcherSP&); - - const std::string getString(const Path&); - Buffer getData(const Path&); //throws ZKNodeDoesNotExistsException - //if watcher is specified, it will be set even if the node does not exists - Buffer getData(const Path&, const NodeChangedWatcherSP&); //throws ZKNodeDoesNotExistsException - - //Parent path must exist - void setData(const Path&, const Buffer& buffer, bool mustExist = false); - void setData(const Path&, const char* buffer, size_t length, bool mustExist = false); - - const Path createSequenceNode(const Path&, const char* buffer, size_t length); - - void remove(const Path&); //throws ZKNodeDoesNotExistsException - void removeIfExists(const Path&); - - void retainOnly(const Path&, const std::vector<std::string>& children); - - void addEphemeralNode(const Path& path); - std::vector<std::string> getChildren(const Path& path); - std::vector<std::string> getChildren(const Path& path, const NodeChangedWatcherSP&); //throws ZKNodeDoesNotExistsException - - //only for use by shutdown code. - void disableRetries(); - bool retriesEnabled() { - return _retriesEnabled; - } - - static std::string getValidZKServers(const std::string &input, bool ignoreDNSFailure); - -private: - class RegistrationGuard { - public: - RegistrationGuard & operator = (const RegistrationGuard &) = delete; - RegistrationGuard(const RegistrationGuard &) = delete; - RegistrationGuard(ZKFacade & zk, const NodeChangedWatcherSP & watcher) : _zk(zk), _watcherContext(_zk.registerWatcher(watcher)) { } - ~RegistrationGuard() { - if (_watcherContext) { - _zk.unregisterWatcher(_watcherContext); - } - } - void * get() { return _watcherContext; } - void release() { _watcherContext = nullptr; } - private: - ZKFacade & _zk; - void * _watcherContext; - }; - void* registerWatcher(const NodeChangedWatcherSP &); //returns watcherContext - std::shared_ptr<ZKWatcher> unregisterWatcher(void* watcherContext); - void invokeWatcher(void* watcherContext); - - std::mutex _watchersMutex; - typedef std::map<void*, std::shared_ptr<ZKWatcher> > WatchersMap; - WatchersMap _watchers; -}; - -class ZKLogging { -public: - ZKLogging(); - ~ZKLogging(); - ZKLogging(const ZKLogging &) = delete; - ZKLogging & operator = (const ZKLogging &) = delete; -private: - FILE * _file; -}; - -} //namespace filedistribution - diff --git a/filedistribution/src/vespa/filedistribution/model/zkfiledbmodel.cpp b/filedistribution/src/vespa/filedistribution/model/zkfiledbmodel.cpp deleted file mode 100644 index 9931b104010..00000000000 --- a/filedistribution/src/vespa/filedistribution/model/zkfiledbmodel.cpp +++ /dev/null @@ -1,301 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "filedistributionmodel.h" -#include "zkfacade.h" -#include "zkfiledbmodel.h" -#include "deployedfilestodownload.h" -#include <vespa/filedistribution/common/logfwd.h> -#include <sys/file.h> - -namespace fs = boost::filesystem; - -namespace filedistribution { - -namespace { - -fs::path -createPath(const std::string& fileReference) { - return ZKFileDBModel::_fileDBPath / fileReference; -} - -void -createNode(const fs::path & path, ZKFacade& zk) { - if (!zk.hasNode(path)) - zk.setData(path, "", 0); -} - -bool -isEntryForHost(const std::string& host, const std::string& peerEntry) { - return host.size() < peerEntry.size() && - std::equal(host.begin(), host.end(), peerEntry.begin()) && - peerEntry[host.size()] == ZKFileDBModel::_peerEntrySeparator; -} - -std::vector<std::string> -getSortedChildren(ZKFacade& zk, const Path& path) { - std::vector<std::string> children = zk.getChildren(path); - std::sort(children.begin(), children.end()); - return children; -} - -} //anonymous namespace - -VESPA_IMPLEMENT_EXCEPTION(InvalidProgressException, vespalib::Exception); -VESPA_IMPLEMENT_EXCEPTION(InvalidHostStatusException, vespalib::Exception); - -const Path ZKFileDBModel::_root = "/vespa/filedistribution"; -const Path ZKFileDBModel::_fileDBPath = _root / "files"; -const Path ZKFileDBModel::_hostsPath = _root / "hosts"; - -bool -ZKFileDBModel::hasFile(const std::string& fileReference) { - return _zk->hasNode(createPath(fileReference)); -} - -void -ZKFileDBModel::addFile(const std::string& fileReference, const Buffer& buffer) { - return _zk->setData(createPath(fileReference), buffer); -} - -Buffer -ZKFileDBModel::getFile(const std::string& fileReference) { - try { - return _zk->getData(createPath(fileReference)); - } catch(const ZKNodeDoesNotExistsException & e) { - throw FileDoesNotExistException(fileReference, e, VESPA_STRLOC); - } -} - -void -ZKFileDBModel::setDeployedFilesToDownload( - const std::string& hostName, - const std::string& appId, - const std::vector<std::string>& files) { - DeployedFilesToDownload d(_zk.get()); - d.setDeployedFilesToDownload(hostName, appId, files); -} - -void -ZKFileDBModel::cleanDeployedFilesToDownload( - const std::vector<std::string>& hostsToPreserve, - const std::string& appId) { - - std::vector<std::string> allHosts = getHosts(); - std::set<std::string> toPreserve(hostsToPreserve.begin(), hostsToPreserve.end()); - - for (auto & host : allHosts) { - Path hostPath = _hostsPath / host; - try { - // If this host is NOT part of hosts to deploy to - if (toPreserve.find(host) == toPreserve.end()) { - removeDeployFileNodes(hostPath, appId); - if (canRemoveHost(hostPath, appId)) { - _zk->remove(hostPath); - } - } - } catch (const ZKNodeDoesNotExistsException& e) { - LOGFWD(debug, "Host '%s' changed. Not touching", hostPath.string().c_str()); - } - } -} - -void -ZKFileDBModel::removeDeploymentsThatHaveDifferentApplicationId( - const std::vector<std::string>& hostsToPreserve, - const std::string& appId) { - - std::vector<std::string> allHosts = getHosts(); - std::set<std::string> toPreserve(hostsToPreserve.begin(), hostsToPreserve.end()); - - for (auto & host : allHosts) { - Path hostPath = _hostsPath / host; - try { - if (toPreserve.find(host) != toPreserve.end()) { - removeNonApplicationFiles(hostPath, appId); - } - } catch (const ZKNodeDoesNotExistsException& e) { - LOGFWD(debug, "Host '%s' changed. Not touching", hostPath.string().c_str()); - } - } -} - - -// Delete files which do not belong to this application. -void -ZKFileDBModel::removeNonApplicationFiles(const Path & hostPath, const std::string& appId) -{ - std::vector<std::string> deployNodes = _zk->getChildren(hostPath); - for (auto & deployNode : deployNodes) { - Path deployNodePath = hostPath / deployNode; - std::string applicationId(readApplicationId(*_zk, deployNodePath)); - if (appId != applicationId) { - _zk->remove(deployNodePath); - } - } -} - -void -ZKFileDBModel::removeDeployFileNodes(const Path & hostPath, const std::string& appId) { - std::vector<std::string> deployNodes = _zk->getChildren(hostPath); - for (auto & deployNode : deployNodes) { - Path deployNodePath = hostPath / deployNode; - std::string applicationId(readApplicationId(*_zk, deployNodePath)); - if (appId == applicationId) { - _zk->remove(deployNodePath); - } - } -} - -bool -ZKFileDBModel::canRemoveHost(const Path & hostPath, const std::string& appId) { - std::vector<std::string> deployNodes = _zk->getChildren(hostPath); - for (auto & deployNode : deployNodes) { - Path deployNodePath = hostPath / deployNode; - std::string applicationId(readApplicationId(*_zk, deployNodePath)); - if (appId != applicationId) { - return false; - } - } - return true; -} - -std::vector<std::string> -ZKFileDBModel::getHosts() { - try { - return _zk->getChildren(_hostsPath); - } catch(ZKNodeDoesNotExistsException&) { - LOGFWD(debug, "No files to be distributed."); - return std::vector<std::string>(); - } -} - -namespace { -const ZKFileDBModel::Progress::value_type notStarted = 101; -}; - -//TODO: Refactor -ZKFileDBModel::HostStatus -ZKFileDBModel::getHostStatus(const std::string& hostName) { - typedef std::vector<std::string> PeerEntries; - - DeployedFilesToDownload d(_zk.get()); - DeployedFilesToDownload::FileReferences filesToDownload = d.getLatestDeployedFilesToDownload(hostName); - - HostStatus hostStatus; - hostStatus._state = HostStatus::notStarted; - hostStatus._numFilesToDownload = filesToDownload.size(); - hostStatus._numFilesFinished = 0; - - for (const std::string & file : filesToDownload) { - Path path = getPeersPath(file); - - const PeerEntries peerEntries = getSortedChildren(*_zk, path); - PeerEntries::const_iterator candidate = - std::lower_bound(peerEntries.begin(), peerEntries.end(), hostName); - - if (candidate != peerEntries.end() && isEntryForHost(hostName, *candidate)) { - char fileProgressPercentage = getProgress(path / (*candidate)); - if (fileProgressPercentage == 100) { - hostStatus._numFilesFinished++; - } else if (fileProgressPercentage != notStarted) { - hostStatus._state = HostStatus::inProgress; - } - - candidate++; - if (candidate != peerEntries.end() && isEntryForHost(hostName, *candidate)) - throw InvalidHostStatusException(path.string(), VESPA_STRLOC); - } - } - - - if (hostStatus._numFilesToDownload == hostStatus._numFilesFinished) { - hostStatus._state = HostStatus::finished; - } - - return hostStatus; -} - -void -ZKFileDBModel::cleanFiles(const std::vector<std::string>& filesToPreserve) { - _zk->retainOnly(_fileDBPath, filesToPreserve); -} - -ZKFileDBModel::ZKFileDBModel(const std::shared_ptr<ZKFacade>& zk) - : _zk(zk) -{ - createNode(_root, *_zk); - createNode(_fileDBPath, *_zk); - createNode(_hostsPath, *_zk); -} - -ZKFileDBModel::~ZKFileDBModel() {} - -char -ZKFileDBModel::getProgress(const Path& path) { - try { - Buffer buffer(_zk->getData(path)); - if (buffer.size() == 1) - return buffer[0]; - else if (buffer.size() == 0) - return 0; - else { - throw InvalidProgressException(path.string(), VESPA_STRLOC); - } - } catch (ZKNodeDoesNotExistsException& e) { - //progress information deleted - return notStarted; - } -} - -ZKFileDBModel::Progress -ZKFileDBModel::getProgress(const std::string& fileReference, - const std::vector<std::string>& hostsSortedAscending) { - Path path = getPeersPath(fileReference); - - Progress progress; - progress.reserve(hostsSortedAscending.size()); - - typedef std::vector<std::string> PeerEntries; - const PeerEntries peerEntries = getSortedChildren(*_zk, path); - - PeerEntries::const_iterator current = peerEntries.begin(); - for (const std::string& host : hostsSortedAscending) { - PeerEntries::const_iterator candidate = - std::lower_bound(current, peerEntries.end(), host); - - ZKFileDBModel::Progress::value_type hostProgress = notStarted; - if (candidate != peerEntries.end()) { - current = candidate; - if (isEntryForHost(host, *current)) - hostProgress = getProgress(path / (*candidate)); - } - progress.push_back(hostProgress); - } - return progress; -} - -FileDBModel::~FileDBModel() {} - -DirectoryGuard::DirectoryGuard(Path path) : - _fd(-1) -{ - _fd = open(path.c_str(), O_RDONLY); - assert(_fd != -1); - int retval = flock(_fd, LOCK_EX); - while ((retval != 0) && (errno == EINTR)) { - std::cout << "Got interupted while flock'ing ' " << path << std::endl; - retval = flock(_fd, LOCK_EX); - } - assert(retval == 0); -} - -DirectoryGuard::~DirectoryGuard() { - if (_fd != -1) { - int retval = flock(_fd, LOCK_UN); - assert(retval == 0); - retval = close(_fd); - assert(retval ==0); - } -} - -} diff --git a/filedistribution/src/vespa/filedistribution/model/zkfiledbmodel.h b/filedistribution/src/vespa/filedistribution/model/zkfiledbmodel.h deleted file mode 100644 index 4a89a9547a9..00000000000 --- a/filedistribution/src/vespa/filedistribution/model/zkfiledbmodel.h +++ /dev/null @@ -1,55 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#pragma once - -#include "filedistributionmodel.h" -#include "zkfacade.h" - -namespace filedistribution { - -class ZKFileDBModel : public FileDBModel { -private: - const std::shared_ptr<ZKFacade> _zk; - char getProgress(const Path& path); - void removeDeployFileNodes(const Path& hostPath, const std::string& appId); - bool canRemoveHost(const Path& hostPath, const std::string& appId); -public: - const static Path _root; - const static Path _fileDBPath; - const static Path _hostsPath; - - const static char _peerEntrySeparator = ':'; - - Path getPeersPath(const std::string& fileReference) { - return _fileDBPath / fileReference; - } - - //overrides - bool hasFile(const std::string& fileReference) override; - void addFile(const std::string& fileReference, const Buffer& buffer) override; - Buffer getFile(const std::string& fileReference) override; - void cleanFiles(const std::vector<std::string>& filesToPreserve) override; - - void setDeployedFilesToDownload(const std::string& hostName, - const std::string & appId, - const std::vector<std::string> & files) override; - void cleanDeployedFilesToDownload( - const std::vector<std::string> & hostsToPreserve, - const std::string& appId) override; - void removeDeploymentsThatHaveDifferentApplicationId( - const std::vector<std::string> & hostsToPreserve, - const std::string& appId) override; - void removeNonApplicationFiles( - const Path & hostPath, - const std::string& appId); - std::vector<std::string> getHosts() override; - HostStatus getHostStatus(const std::string& hostName) override; - - ZKFileDBModel(const std::shared_ptr<ZKFacade>& zk); - ~ZKFileDBModel(); - - Progress getProgress(const std::string& fileReference, - const std::vector<std::string>& hostsSortedAscending) override; -}; - -} //namespace filedistribution - diff --git a/filedistribution/src/vespa/filedistribution/rpc/CMakeLists.txt b/filedistribution/src/vespa/filedistribution/rpc/CMakeLists.txt deleted file mode 100644 index bc03f71eae7..00000000000 --- a/filedistribution/src/vespa/filedistribution/rpc/CMakeLists.txt +++ /dev/null @@ -1,6 +0,0 @@ -# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -vespa_add_library(filedistribution_filedistributorrpc STATIC - SOURCES - filedistributorrpc.cpp - DEPENDS -) diff --git a/filedistribution/src/vespa/filedistribution/rpc/filedistributorrpc.cpp b/filedistribution/src/vespa/filedistribution/rpc/filedistributorrpc.cpp deleted file mode 100644 index ff1248af157..00000000000 --- a/filedistribution/src/vespa/filedistribution/rpc/filedistributorrpc.cpp +++ /dev/null @@ -1,267 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "filedistributorrpc.h" -#include <mutex> - -#include <vespa/fnet/frt/frt.h> -#include <vespa/frtstream/frtserverstream.h> -#include <map> - -#include "fileprovider.h" -#include <vespa/filedistribution/model/filedbmodel.h> -#include <vespa/log/log.h> -LOG_SETUP(".filedistributorrpc"); - -using filedistribution::FileDistributorRPC; -using filedistribution::FileProvider; - -namespace fs = boost::filesystem; - -namespace { -typedef std::lock_guard<std::mutex> LockGuard; - -struct RPCErrorCodes { - const static uint32_t baseErrorCode = 0x10000; - const static uint32_t baseFileProviderErrorCode = baseErrorCode + 0x1000; - - const static uint32_t unknownError = baseErrorCode + 1; -}; - -class QueuedRequests { - bool _shuttingDown; - - std::mutex _mutex; - typedef std::multimap<std::string, FRT_RPCRequest*> Map; - Map _queuedRequests; - - template <class FUNC> - void returnAnswer(const std::string& fileReference, FUNC func) { - LockGuard guard(_mutex); - - typedef Map::iterator iterator; - std::pair<iterator, iterator> range = _queuedRequests.equal_range(fileReference); - - for (iterator it(range.first); it != range.second; it++) { - const Map::value_type & request(*it); - LOG(info, "Returning earlier enqueued request for file reference '%s'.", request.first.c_str()); - func(*request.second); - request.second->Return(); - } - - _queuedRequests.erase(range.first, range.second); - } - - struct DownloadFinished { - const std::string& _path; - - void operator()(FRT_RPCRequest& request) { - LOG(info, "Download finished: '%s'", _path.c_str()); - frtstream::FrtServerStream requestHandler(&request); - requestHandler <<_path; - } - - DownloadFinished(const std::string& path) - :_path(path) - {} - }; - - struct DownloadFailed { - FileProvider::FailedDownloadReason _reason; - - void operator()(FRT_RPCRequest& request) { - LOG(info, "Download failed: '%d'", _reason); - request.SetError(RPCErrorCodes::baseFileProviderErrorCode + _reason, "Download failed"); - } - - DownloadFailed(FileProvider::FailedDownloadReason reason) - :_reason(reason) - {} - }; - -public: - QueuedRequests() - :_shuttingDown(false) - {} - - void enqueue(const std::string& fileReference, FRT_RPCRequest* request) { - LockGuard guard(_mutex); - - if (_shuttingDown) { - LOG(info, "Shutdown: Aborting request for file reference '%s'.", fileReference.c_str()); - abort(request); - } else { - _queuedRequests.insert(std::make_pair(fileReference, request)); - } - } - - void abort(FRT_RPCRequest* request) { - request->SetError(FRTE_RPC_ABORT); - request->Return(); - } - - void dequeue(const std::string& fileReference, FRT_RPCRequest* request) { - LockGuard guard(_mutex); - - typedef Map::iterator iterator; - std::pair<iterator, iterator> range = _queuedRequests.equal_range(fileReference); - - iterator candidate = std::find(range.first, range.second, - std::pair<const std::string, FRT_RPCRequest*>(fileReference, request)); - - if (candidate != range.second) - _queuedRequests.erase(candidate); - } - - void downloadFinished(const std::string& fileReference, const fs::path& path) { - - DownloadFinished handler(path.string()); - returnAnswer(fileReference, handler); - } - - void downloadFailed(const std::string& fileReference, FileProvider::FailedDownloadReason reason) { - - DownloadFailed handler(reason); - returnAnswer(fileReference, handler); - } - - void shutdown() { - LockGuard guard(_mutex); - _shuttingDown = true; - - for (const Map::value_type& request : _queuedRequests) { - LOG(info, "Shutdown: Aborting earlier enqueued request for file reference '%s'.", request.first.c_str()); - abort(request.second); - } - _queuedRequests.erase(_queuedRequests.begin(), _queuedRequests.end()); - } -}; - -} //anonymous namespace - -class FileDistributorRPC::Server : public FRT_Invokable { - public: - FileProvider::SP _fileProvider; - std::unique_ptr<FRT_Supervisor> _supervisor; - - QueuedRequests _queuedRequests; - - boost::signals2::scoped_connection _downloadCompletedConnection; - boost::signals2::scoped_connection _downloadFailedConnection; - - void queueRequest(const std::string& fileReference, FRT_RPCRequest* request); - void defineMethods(); - - Server(const Server &) = delete; - Server & operator = (const Server &) = delete; - Server(int listen_port, const FileProvider::SP & provider); - void start(const FileDistributorRPC::SP & parent); - ~Server(); - - void waitFor(FRT_RPCRequest*); -}; - -FileDistributorRPC:: -Server::Server(int listen_port, const FileProvider::SP & provider) - :_fileProvider(provider), - _supervisor(new FRT_Supervisor()) -{ - defineMethods(); - _supervisor->Listen(listen_port); - _supervisor->Start(); -} - - -FileDistributorRPC::Server::~Server() { - _queuedRequests.shutdown(); - - const bool waitForFinished = true; - _supervisor->ShutDown(waitForFinished); -} - -void -FileDistributorRPC::Server::start(const FileDistributorRPC::SP & parent) { - _downloadCompletedConnection = - _fileProvider->downloadCompleted().connect(FileProvider::DownloadCompletedSignal::slot_type( - [&] (const std::string &file, const fs::path& path) { _queuedRequests.downloadFinished(file, path); }) - .track_foreign(parent)); - - _downloadFailedConnection = - _fileProvider->downloadFailed().connect(FileProvider::DownloadFailedSignal::slot_type( - [&] (const std::string& file, FileProvider::FailedDownloadReason reason) { _queuedRequests.downloadFailed(file, reason); }) - .track_foreign(parent)); - - -} - -void -FileDistributorRPC:: -Server::queueRequest(const std::string& fileReference, FRT_RPCRequest* request) { - _queuedRequests.enqueue( fileReference, request ); - try { - _fileProvider->downloadFile(fileReference); - } catch(...) { - _queuedRequests.dequeue(fileReference, request); - throw; - } -} - -void -FileDistributorRPC::Server::defineMethods() { - const bool instant = true; - FRT_ReflectionBuilder builder(_supervisor.get()); - builder.DefineMethod("waitFor", "s", "s", instant, - FRT_METHOD(Server::waitFor), this); -} - -void -FileDistributorRPC::Server::waitFor(FRT_RPCRequest* request) { - try { - frtstream::FrtServerStream requestHandler(request); - std::string fileReference; - requestHandler >> fileReference; - boost::optional<fs::path> path = _fileProvider->getPath(fileReference); - if (path) { - LOG(debug, "Returning request for file reference '%s'.", fileReference.c_str()); - requestHandler << path->string(); - } else { - LOG(debug, "Enqueuing file request for file reference '%s'.", fileReference.c_str()); - request->Detach(); - queueRequest(fileReference, request); - } - } catch (const FileDoesNotExistException&) { - LOG(warning, "Received a request for a file reference that does not exist in zookeeper."); - request->SetError(RPCErrorCodes::baseFileProviderErrorCode + FileProvider::FileReferenceDoesNotExist, - "No such file reference"); - request->Return(); - } catch (const std::exception& e) { - LOG(error, "An exception occurred while calling the rpc method waitFor:%s", e.what()); - request->SetError(RPCErrorCodes::unknownError, e.what()); - request->Return(); //the request might be detached. - } -} - -FileDistributorRPC::FileDistributorRPC(const std::string& connectionSpec, - const FileProvider::SP & provider) - :_server(new Server(get_port(connectionSpec), provider)) -{} - -void -FileDistributorRPC::start() -{ - _server->start(shared_from_this()); -} - -int -FileDistributorRPC::get_port(const std::string &spec) -{ - const char *port = (spec.data() + spec.size()); - while ((port > spec.data()) && (port[-1] >= '0') && (port[-1] <= '9')) { - --port; - } - return atoi(port); -} - -filedistribution::FileDistributorRPC::~FileDistributorRPC() -{ - LOG(debug, "Deconstructing FileDistributorRPC"); -} diff --git a/filedistribution/src/vespa/filedistribution/rpc/filedistributorrpc.h b/filedistribution/src/vespa/filedistribution/rpc/filedistributorrpc.h deleted file mode 100644 index cf9ed110d36..00000000000 --- a/filedistribution/src/vespa/filedistribution/rpc/filedistributorrpc.h +++ /dev/null @@ -1,29 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#pragma once - -#include <memory> - -#include "fileprovider.h" - -namespace filedistribution { - -class FileDistributorRPC : public std::enable_shared_from_this<FileDistributorRPC> -{ - class Server; -public: - using SP = std::shared_ptr<FileDistributorRPC>; - FileDistributorRPC(const FileDistributorRPC &) = delete; - FileDistributorRPC & operator = (const FileDistributorRPC &) = delete; - FileDistributorRPC(const std::string& connectSpec, const FileProvider::SP & provider); - - void start(); - - static int get_port(const std::string &spec); - - ~FileDistributorRPC(); -private: - std::unique_ptr<Server> _server; -}; - -} //namespace filedistribution - diff --git a/filedistribution/src/vespa/filedistribution/rpc/fileprovider.h b/filedistribution/src/vespa/filedistribution/rpc/fileprovider.h deleted file mode 100644 index a92d09408b1..00000000000 --- a/filedistribution/src/vespa/filedistribution/rpc/fileprovider.h +++ /dev/null @@ -1,36 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#pragma once - -#include <vespa/filedistribution/common/exception.h> -#include<boost/optional.hpp> -#include<boost/signals2.hpp> - -namespace filedistribution { - -class FileProvider -{ -public: - using SP = std::shared_ptr<FileProvider>; - typedef boost::signals2::signal<void (const std::string& /* fileReference */, const Path&)> DownloadCompletedSignal; - typedef DownloadCompletedSignal::slot_type DownloadCompletedHandler; - - enum FailedDownloadReason { - FileReferenceDoesNotExist, - FileReferenceRemoved - }; - - typedef boost::signals2::signal<void (const std::string& /* fileReference */, FailedDownloadReason)> DownloadFailedSignal; - typedef DownloadFailedSignal::slot_type DownloadFailedHandler; - - virtual boost::optional<Path> getPath(const std::string& fileReference) = 0; - virtual void downloadFile(const std::string& fileReference) = 0; - - virtual ~FileProvider() {} - - //Signals - virtual DownloadCompletedSignal& downloadCompleted() = 0; - virtual DownloadFailedSignal& downloadFailed() = 0; -}; - -} //namespace filedistribution - |