summaryrefslogtreecommitdiffstats
path: root/filedistribution
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2018-02-08 17:30:00 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2018-02-08 17:30:00 +0000
commite4c657f4c777eb61e1c8d88aba5e52a0862212ff (patch)
treeeae98c8cad6125af035eac3c1bbb9530fbb15c69 /filedistribution
parente90d8aeecfd7c39826a2ee261c4320b3d2260ac0 (diff)
Remove filedistribution cpp code
Diffstat (limited to 'filedistribution')
-rw-r--r--filedistribution/CMakeLists.txt39
-rw-r--r--filedistribution/src/apps/.gitignore1
-rw-r--r--filedistribution/src/apps/filedistributor/.gitignore2
-rw-r--r--filedistribution/src/apps/filedistributor/CMakeLists.txt17
-rw-r--r--filedistribution/src/apps/filedistributor/filedistributor.cpp375
-rw-r--r--filedistribution/src/apps/status/.gitignore2
-rw-r--r--filedistribution/src/apps/status/CMakeLists.txt16
-rw-r--r--filedistribution/src/apps/status/status-filedistribution.cpp180
-rw-r--r--filedistribution/src/main/sh/vespa-status-filedistribution.sh (renamed from filedistribution/src/apps/status/vespa-status-filedistribution.sh)27
-rw-r--r--filedistribution/src/tests/.gitignore2
-rw-r--r--filedistribution/src/tests/common/.gitignore1
-rw-r--r--filedistribution/src/tests/common/CMakeLists.txt13
-rw-r--r--filedistribution/src/tests/common/testCommon.cpp41
-rw-r--r--filedistribution/src/tests/filedbmodelimpl/.gitignore1
-rw-r--r--filedistribution/src/tests/filedbmodelimpl/CMakeLists.txt16
-rw-r--r--filedistribution/src/tests/filedbmodelimpl/test-filedistributionmodelimpl.cpp51
-rw-r--r--filedistribution/src/tests/filedownloader/.gitignore1
-rw-r--r--filedistribution/src/tests/filedownloader/CMakeLists.txt15
-rw-r--r--filedistribution/src/tests/filedownloader/testfiledownloader.cpp143
-rw-r--r--filedistribution/src/tests/lib/CMakeLists.txt6
-rw-r--r--filedistribution/src/tests/lib/mock-zookeeper.cpp327
-rw-r--r--filedistribution/src/tests/rpc/.gitignore1
-rw-r--r--filedistribution/src/tests/rpc/CMakeLists.txt15
-rw-r--r--filedistribution/src/tests/rpc/mockfileprovider.h49
-rw-r--r--filedistribution/src/tests/rpc/testfileprovider.cpp66
-rw-r--r--filedistribution/src/tests/scheduler/.gitignore1
-rw-r--r--filedistribution/src/tests/scheduler/CMakeLists.txt16
-rw-r--r--filedistribution/src/tests/scheduler/test-scheduler.cpp109
-rw-r--r--filedistribution/src/tests/status/.gitignore1
-rw-r--r--filedistribution/src/tests/status/CMakeLists.txt15
-rw-r--r--filedistribution/src/tests/status/test-status.cpp16
-rw-r--r--filedistribution/src/tests/zkfacade/.gitignore1
-rw-r--r--filedistribution/src/tests/zkfacade/CMakeLists.txt15
-rw-r--r--filedistribution/src/tests/zkfacade/test-zkfacade.cpp228
-rw-r--r--filedistribution/src/tests/zkfiledbmodel/.gitignore1
-rw-r--r--filedistribution/src/tests/zkfiledbmodel/CMakeLists.txt16
-rw-r--r--filedistribution/src/tests/zkfiledbmodel/test-zkfiledbmodel.cpp88
-rw-r--r--filedistribution/src/vespa/.gitignore3
-rw-r--r--filedistribution/src/vespa/filedistribution/common/CMakeLists.txt15
-rw-r--r--filedistribution/src/vespa/filedistribution/common/buffer.h93
-rw-r--r--filedistribution/src/vespa/filedistribution/common/componentsdeleter.cpp81
-rw-r--r--filedistribution/src/vespa/filedistribution/common/componentsdeleter.h73
-rw-r--r--filedistribution/src/vespa/filedistribution/common/concurrentqueue.h56
-rw-r--r--filedistribution/src/vespa/filedistribution/common/exception.cpp8
-rw-r--r--filedistribution/src/vespa/filedistribution/common/exception.h13
-rw-r--r--filedistribution/src/vespa/filedistribution/common/logfwd.h22
-rw-r--r--filedistribution/src/vespa/filedistribution/common/vespa_logfwd.cpp44
-rw-r--r--filedistribution/src/vespa/filedistribution/distributor/CMakeLists.txt13
-rw-r--r--filedistribution/src/vespa/filedistribution/distributor/filedistributortrackerimpl.cpp202
-rw-r--r--filedistribution/src/vespa/filedistribution/distributor/filedistributortrackerimpl.h39
-rw-r--r--filedistribution/src/vespa/filedistribution/distributor/filedownloader.cpp460
-rw-r--r--filedistribution/src/vespa/filedistribution/distributor/filedownloader.h86
-rw-r--r--filedistribution/src/vespa/filedistribution/distributor/filedownloadermanager.cpp136
-rw-r--r--filedistribution/src/vespa/filedistribution/distributor/filedownloadermanager.h67
-rw-r--r--filedistribution/src/vespa/filedistribution/distributor/hostname.cpp25
-rw-r--r--filedistribution/src/vespa/filedistribution/distributor/hostname.h13
-rw-r--r--filedistribution/src/vespa/filedistribution/distributor/scheduler.cpp45
-rw-r--r--filedistribution/src/vespa/filedistribution/distributor/scheduler.h46
-rw-r--r--filedistribution/src/vespa/filedistribution/distributor/signalhandling.cpp40
-rw-r--r--filedistribution/src/vespa/filedistribution/distributor/signalhandling.h15
-rw-r--r--filedistribution/src/vespa/filedistribution/distributor/state_server_impl.cpp7
-rw-r--r--filedistribution/src/vespa/filedistribution/distributor/state_server_impl.h21
-rw-r--r--filedistribution/src/vespa/filedistribution/manager/.gitignore5
-rw-r--r--filedistribution/src/vespa/filedistribution/manager/CMakeLists.txt17
-rw-r--r--filedistribution/src/vespa/filedistribution/manager/createtorrent.cpp87
-rw-r--r--filedistribution/src/vespa/filedistribution/manager/createtorrent.h23
-rw-r--r--filedistribution/src/vespa/filedistribution/manager/field.h46
-rw-r--r--filedistribution/src/vespa/filedistribution/manager/filedb.cpp61
-rw-r--r--filedistribution/src/vespa/filedistribution/manager/filedb.h25
-rw-r--r--filedistribution/src/vespa/filedistribution/manager/filedistributionmanager.cpp188
-rw-r--r--filedistribution/src/vespa/filedistribution/manager/jnistring.h78
-rw-r--r--filedistribution/src/vespa/filedistribution/manager/stderr_logfwd.cpp24
-rw-r--r--filedistribution/src/vespa/filedistribution/model/CMakeLists.txt18
-rw-r--r--filedistribution/src/vespa/filedistribution/model/deployedfilestodownload.cpp149
-rw-r--r--filedistribution/src/vespa/filedistribution/model/deployedfilestodownload.h54
-rw-r--r--filedistribution/src/vespa/filedistribution/model/filedbmodel.h60
-rw-r--r--filedistribution/src/vespa/filedistribution/model/filedistributionmodel.h44
-rw-r--r--filedistribution/src/vespa/filedistribution/model/filedistributionmodelimpl.cpp231
-rw-r--r--filedistribution/src/vespa/filedistribution/model/filedistributionmodelimpl.h68
-rw-r--r--filedistribution/src/vespa/filedistribution/model/mockfiledistributionmodel.h53
-rw-r--r--filedistribution/src/vespa/filedistribution/model/zkfacade.cpp605
-rw-r--r--filedistribution/src/vespa/filedistribution/model/zkfacade.h147
-rw-r--r--filedistribution/src/vespa/filedistribution/model/zkfiledbmodel.cpp301
-rw-r--r--filedistribution/src/vespa/filedistribution/model/zkfiledbmodel.h55
-rw-r--r--filedistribution/src/vespa/filedistribution/rpc/CMakeLists.txt6
-rw-r--r--filedistribution/src/vespa/filedistribution/rpc/filedistributorrpc.cpp267
-rw-r--r--filedistribution/src/vespa/filedistribution/rpc/filedistributorrpc.h29
-rw-r--r--filedistribution/src/vespa/filedistribution/rpc/fileprovider.h36
88 files changed, 10 insertions, 6204 deletions
diff --git a/filedistribution/CMakeLists.txt b/filedistribution/CMakeLists.txt
deleted file mode 100644
index 8baee25a192..00000000000
--- a/filedistribution/CMakeLists.txt
+++ /dev/null
@@ -1,39 +0,0 @@
-# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-vespa_define_module(
- DEPENDS
- fastos
- vespalog
- fnet
- frtstream
- configdefinitions
- config_cloudconfig
- staging_vespalib
- vespadefaults
- vespalib
- fileacquirer
-
- EXTERNAL_DEPENDS
- torrent-rasterbar
-
- LIBS
- src/vespa/filedistribution/rpc
- src/vespa/filedistribution/common
- src/vespa/filedistribution/manager
- src/vespa/filedistribution/distributor
- src/vespa/filedistribution/model
-
- TESTS
- src/tests/zkfacade
- src/tests/zkfiledbmodel
- src/tests/filedbmodelimpl
- src/tests/status
- src/tests/scheduler
- src/tests/rpc
- src/tests/common
- src/tests/filedownloader
- src/tests/lib
-
- APPS
- src/apps/status
- src/apps/filedistributor
-)
diff --git a/filedistribution/src/apps/.gitignore b/filedistribution/src/apps/.gitignore
deleted file mode 100644
index f3c7a7c5da6..00000000000
--- a/filedistribution/src/apps/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-Makefile
diff --git a/filedistribution/src/apps/filedistributor/.gitignore b/filedistribution/src/apps/filedistributor/.gitignore
deleted file mode 100644
index 89aaa70637e..00000000000
--- a/filedistribution/src/apps/filedistributor/.gitignore
+++ /dev/null
@@ -1,2 +0,0 @@
-/filedistributor
-vespa-filedistributor-bin
diff --git a/filedistribution/src/apps/filedistributor/CMakeLists.txt b/filedistribution/src/apps/filedistributor/CMakeLists.txt
deleted file mode 100644
index 22ffcddcd6f..00000000000
--- a/filedistribution/src/apps/filedistributor/CMakeLists.txt
+++ /dev/null
@@ -1,17 +0,0 @@
-# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-vespa_add_executable(filedistribution_filedistributor_app
- SOURCES
- filedistributor.cpp
- OUTPUT_NAME vespa-filedistributor-bin
- INSTALL sbin
- DEPENDS
- filedistribution_distributor
- filedistribution_filedistributionmodel
- filedistribution_filedistributorrpc
- filedistribution_common
-)
-target_compile_options(filedistribution_filedistributor_app PRIVATE -DTORRENT_DISABLE_ENCRYPTION -DTORRENT_DISABLE_DHT -DWITH_SHIPPED_GEOIP_H -DBOOST_ASIO_HASH_MAP_BUCKETS=1021 -DBOOST_EXCEPTION_DISABLE -DBOOST_ASIO_ENABLE_CANCELIO -DBOOST_ASIO_DYN_LINK -DTORRENT_LINKING_SHARED)
-vespa_add_target_system_dependency(filedistribution_filedistributor_app boost boost_system${VESPA_BOOST_LIB_SUFFIX})
-vespa_add_target_system_dependency(filedistribution_filedistributor_app boost boost_thread${VESPA_BOOST_LIB_SUFFIX})
-vespa_add_target_system_dependency(filedistribution_filedistributor_app boost boost_program_options${VESPA_BOOST_LIB_SUFFIX})
-vespa_add_target_system_dependency(filedistribution_filedistributor_app boost boost_filesystem${VESPA_BOOST_LIB_SUFFIX})
diff --git a/filedistribution/src/apps/filedistributor/filedistributor.cpp b/filedistribution/src/apps/filedistributor/filedistributor.cpp
deleted file mode 100644
index f2bf5fa0ccd..00000000000
--- a/filedistribution/src/apps/filedistributor/filedistributor.cpp
+++ /dev/null
@@ -1,375 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include <vespa/config-filedistributor.h>
-#include <vespa/config-filereferences.h>
-
-#include <vespa/filedistribution/distributor/filedistributortrackerimpl.h>
-#include <vespa/filedistribution/distributor/filedownloadermanager.h>
-#include <vespa/filedistribution/distributor/signalhandling.h>
-#include <vespa/filedistribution/distributor/state_server_impl.h>
-
-#include <vespa/filedistribution/model/filedistributionmodelimpl.h>
-#include <vespa/filedistribution/rpc/filedistributorrpc.h>
-#include <vespa/filedistribution/common/componentsdeleter.h>
-#include <vespa/fileacquirer/config-filedistributorrpc.h>
-#include <vespa/config/common/exceptions.h>
-#include <vespa/config-zookeepers.h>
-#include <vespa/fastos/app.h>
-#include <boost/program_options.hpp>
-
-namespace {
-const char* programName = "filedistributor";
-}
-
-#include <vespa/log/log.h>
-LOG_SETUP(programName);
-
-using namespace std::literals;
-
-using namespace filedistribution;
-using cloud::config::ZookeepersConfig;
-using cloud::config::filedistribution::FiledistributorConfig;
-using cloud::config::filedistribution::FiledistributorrpcConfig;
-
-class FileDistributor : public config::IFetcherCallback<ZookeepersConfig>,
- public config::IFetcherCallback<FiledistributorConfig>,
- public config::IFetcherCallback<FiledistributorrpcConfig>,
- public config::IGenerationCallback
-{
- class Components {
- ComponentsDeleter _componentsDeleter;
- public:
- const std::shared_ptr<ZKFacade> _zk;
- const std::shared_ptr<FileDistributionModelImpl> _model;
- const std::shared_ptr<FileDistributorTrackerImpl> _tracker;
- const std::shared_ptr<FileDownloader> _downloader;
- const FileDownloaderManager::SP _manager;
- const FileDistributorRPC::SP _rpcHandler;
- const std::shared_ptr<StateServerImpl> _stateServer;
-
- private:
- class GuardedThread {
- public:
- GuardedThread(const GuardedThread &) = delete;
- GuardedThread & operator = (const GuardedThread &) = delete;
- GuardedThread(const std::shared_ptr<FileDownloader> & downloader) :
- _downloader(downloader),
- _thread([downloader=_downloader] () { downloader->runEventLoop(); })
- { }
- ~GuardedThread() {
- _downloader->close();
- if (_thread.joinable()) {
- _thread.join();
- }
- if ( !_downloader->drained() ) {
- LOG(error, "The filedownloader did not drain fully. We will just exit quickly and let a restart repair it for us.");
- std::quick_exit(67);
- }
- }
- private:
- std::shared_ptr<FileDownloader> _downloader;
- std::thread _thread;
- };
- std::unique_ptr<GuardedThread> _downloaderEventLoopThread;
- config::ConfigFetcher _configFetcher;
-
- template <class T>
- typename std::shared_ptr<T> track(T* component) {
- return _componentsDeleter.track(component);
- }
-
- public:
- Components(const Components &) = delete;
- Components & operator = (const Components &) = delete;
-
- Components(const config::ConfigUri & configUri,
- const ZookeepersConfig& zooKeepersConfig,
- const FiledistributorConfig& fileDistributorConfig,
- const FiledistributorrpcConfig& rpcConfig)
- :_zk(track(new ZKFacade(zooKeepersConfig.zookeeperserverlist, false))),
- _model(track(new FileDistributionModelImpl(fileDistributorConfig.hostname, fileDistributorConfig.torrentport, _zk))),
- _tracker(track(new FileDistributorTrackerImpl(_model))),
- _downloader(track(new FileDownloader(_tracker, fileDistributorConfig.hostname, fileDistributorConfig.torrentport, Path(fileDistributorConfig.filedbpath)))),
- _manager(track(new FileDownloaderManager(_downloader, _model))),
- _rpcHandler(track(new FileDistributorRPC(rpcConfig.connectionspec, _manager))),
- _stateServer(track(new StateServerImpl(fileDistributorConfig.stateport))),
- _downloaderEventLoopThread(),
- _configFetcher(configUri.getContext())
- {
- _downloaderEventLoopThread = std::make_unique<GuardedThread>(_downloader);
- _manager->start();
- _rpcHandler->start();
-
- _tracker->setDownloader(_downloader);
- _configFetcher.subscribe<FilereferencesConfig>(configUri.getConfigId(), _model.get());
- _configFetcher.start();
- updatedConfig(_configFetcher.getGeneration());
- }
-
- void updatedConfig(int64_t generation) {
- vespalib::ComponentConfigProducer::Config curr("filedistributor", generation);
- _stateServer->myComponents.addConfig(curr);
- }
-
- ~Components() {
- _configFetcher.close();
- //Do not waste time retrying zookeeper operations when going down.
- _zk->disableRetries();
-
- _downloaderEventLoopThread.reset();
- }
-
- };
-
- typedef std::lock_guard<std::mutex> LockGuard;
- std::mutex _configMutex;
-
- bool _completeReconfigurationNeeded;
- std::unique_ptr<ZookeepersConfig> _zooKeepersConfig;
- std::unique_ptr<FiledistributorConfig> _fileDistributorConfig;
- std::unique_ptr<FiledistributorrpcConfig> _rpcConfig;
-
- std::unique_ptr<Components> _components;
-public:
- FileDistributor(const FileDistributor &) = delete;
- FileDistributor & operator = (const FileDistributor &) = delete;
- FileDistributor();
- ~FileDistributor();
-
- void notifyGenerationChange(int64_t generation) override {
- if (_components && ! completeReconfigurationNeeded()) {
- _components->updatedConfig(generation);
- }
- }
-
- //configure overrides
- void configure(std::unique_ptr<ZookeepersConfig> config) override {
- LockGuard guard(_configMutex);
- _zooKeepersConfig = std::move(config);
- _completeReconfigurationNeeded = true;
- }
-
- void configure(std::unique_ptr<FiledistributorConfig> config) override {
- LockGuard guard(_configMutex);
- if (_fileDistributorConfig.get() != NULL &&
- (config->torrentport != _fileDistributorConfig->torrentport ||
- config->stateport != _fileDistributorConfig->stateport ||
- config->hostname != _fileDistributorConfig->hostname ||
- config->filedbpath != _fileDistributorConfig->filedbpath))
- {
- _completeReconfigurationNeeded = true;
- } else if (_components.get()) {
- configureSpeedLimits(*config);
- }
- _fileDistributorConfig = std::move(config);
-
- }
-
- void configure(std::unique_ptr<FiledistributorrpcConfig> config) override {
- LockGuard guard(_configMutex);
- _rpcConfig = std::move(config);
- _completeReconfigurationNeeded = true;
- }
-
- void run(const config::ConfigUri & configUri) {
- while (!askedToShutDown()) {
- clearReinitializeFlag();
- runImpl(configUri);
- }
- }
-
- bool isConfigComplete() {
- LockGuard guard(_configMutex);
- return (_zooKeepersConfig && _fileDistributorConfig && _rpcConfig);
- }
- void createComponents(const config::ConfigUri & configUri) {
- LockGuard guard(_configMutex);
- _components.reset(
- new Components(configUri,
- *_zooKeepersConfig,
- *_fileDistributorConfig,
- *_rpcConfig));
-
- configureSpeedLimits(*_fileDistributorConfig);
- _completeReconfigurationNeeded = false;
- }
-
- bool completeReconfigurationNeeded() {
- LockGuard guard(_configMutex);
- if (_completeReconfigurationNeeded) {
- LOG(debug, "Complete reconfiguration needed");
- }
- return _completeReconfigurationNeeded;
- }
-
- void configureSpeedLimits(const FiledistributorConfig& config) {
- FileDownloader& downloader = *_components->_downloader;
- downloader.setMaxDownloadSpeed(config.maxdownloadspeed);
- downloader.setMaxUploadSpeed(config.maxuploadspeed);
- }
-
- void runImpl(const config::ConfigUri & configUri) {
- createComponents(configUri);
-
- // We do not want back to back reinitializing as it gives zero time for serving
- // some torrents.
- int postPoneAskedToReinitializedSecs = 50;
-
- while (!askedToShutDown() &&
- (postPoneAskedToReinitializedSecs > 0 || !askedToReinitialize()) &&
- !completeReconfigurationNeeded())
- {
- postPoneAskedToReinitializedSecs--;
- std::this_thread::sleep_for(1s);
- }
- _components.reset();
- }
-};
-
-FileDistributor::FileDistributor()
- : _configMutex(),
- _completeReconfigurationNeeded(false),
- _zooKeepersConfig(),
- _fileDistributorConfig(),
- _rpcConfig(),
- _components()
-{ }
-FileDistributor::~FileDistributor() { }
-
-class FileDistributorApplication : public FastOS_Application {
- const config::ConfigUri _configUri;
-public:
- FileDistributorApplication(const config::ConfigUri & configUri);
-
- int Main() override;
-};
-
-namespace {
-
-struct ProgramOptionException {
- std::string _msg;
- ProgramOptionException(const std::string & msg)
- : _msg(msg)
- {}
-};
-
-bool exists(const std::string& optionName, const boost::program_options::variables_map& map) {
- return map.find(optionName) != map.end();
-}
-
-void ensureExists(const std::string& optionName, const boost::program_options::variables_map& map ) {
- if (!exists(optionName, map)) {
- throw ProgramOptionException("Error: Missing option " + optionName);
- }
-}
-
-} //anonymous namespace
-
-FileDistributorApplication::FileDistributorApplication(const config::ConfigUri & configUri)
- :_configUri(configUri) {
-}
-
-int
-FileDistributorApplication::Main() {
- try {
- FileDistributor distributor;
-
- config::ConfigFetcher configFetcher(_configUri.getContext());
- configFetcher.subscribe<ZookeepersConfig>(_configUri.getConfigId(), &distributor);
- configFetcher.subscribe<FiledistributorConfig>(_configUri.getConfigId(), &distributor);
- configFetcher.subscribe<FiledistributorrpcConfig>(_configUri.getConfigId(), &distributor);
- configFetcher.subscribeGenerationChanges(&distributor);
- configFetcher.start();
-
- while (! distributor.isConfigComplete() ) {
- std::this_thread::sleep_for(10ms);
- }
- distributor.run(_configUri);
-
- EV_STOPPING(programName, "Clean exit");
- return 0;
- } catch (const FileDoesNotExistException & e) {
- EV_STOPPING(programName, e.what());
- return 1;
- } catch (const ZKNodeDoesNotExistsException & e) {
- EV_STOPPING(programName, e.what());
- return 2;
- } catch (const ZKSessionExpired & e) {
- EV_STOPPING(programName, e.what());
- return 3;
- } catch (const config::ConfigTimeoutException & e) {
- EV_STOPPING(programName, e.what());
- return 4;
- } catch (const vespalib::PortListenException & e) {
- EV_STOPPING(programName, e.what());
- return 5;
- } catch (const ZKConnectionLossException & e) {
- EV_STOPPING(programName, e.what());
- return 6;
- } catch (const ZKFailedConnecting & e) {
- EV_STOPPING(programName, e.what());
- return 7;
- } catch (const config::InvalidConfigException & e) {
- EV_STOPPING(programName, e.what());
- return 8;
- } catch (const ZKOperationTimeoutException & e) {
- EV_STOPPING(programName, e.what());
- return 9;
- } catch (const ZKGenericException & e) {
- EV_STOPPING(programName, e.what());
- return 99;
- }
-}
-
-int
-executeApplication(int argc, char** argv) {
- const char
- *configId("configid"),
- *help("help");
-
- namespace po = boost::program_options;
- po::options_description description;
- description.add_options()
- (configId, po::value<std::string > (), "id to request config for")
- (help, "help");
-
- try {
- po::variables_map values;
- po::store(po::parse_command_line(argc, argv, description), values);
-
- if (exists(help, values)) {
- std::cout <<description;
- return 0;
- }
- ensureExists(configId, values);
-
- FileDistributorApplication application(values[configId].as<std::string > ());
- return application.Entry(argc, argv);
-
- } catch(ProgramOptionException& e) {
- std::cerr <<e._msg <<std::endl;
- return -1;
- }
-}
-
-namespace {
-
-class InitSignals {
-public:
- InitSignals() { initSignals(); }
-};
-
-InitSignals _G_initSignals __attribute__ ((init_priority (101)));
-
-}
-
-int
-main(int argc, char** argv) {
- if (askedToShutDown()) { return 0; }
- EV_STARTED(programName);
-
- std::srand(std::time(0));
- filedistribution::ZKLogging loggingGuard;
-
- return executeApplication(argc, argv);
-}
diff --git a/filedistribution/src/apps/status/.gitignore b/filedistribution/src/apps/status/.gitignore
deleted file mode 100644
index 2105a3c7051..00000000000
--- a/filedistribution/src/apps/status/.gitignore
+++ /dev/null
@@ -1,2 +0,0 @@
-/vespa-status-filedistribution-bin
-filedistribution_status-filedistribution_app
diff --git a/filedistribution/src/apps/status/CMakeLists.txt b/filedistribution/src/apps/status/CMakeLists.txt
deleted file mode 100644
index e666bc2cecc..00000000000
--- a/filedistribution/src/apps/status/CMakeLists.txt
+++ /dev/null
@@ -1,16 +0,0 @@
-# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-vespa_add_executable(filedistribution_status-filedistribution_app
- SOURCES
- status-filedistribution.cpp
- INSTALL bin
- OUTPUT_NAME vespa-status-filedistribution-bin
- DEPENDS
- filedistribution_filedistributionmodel
- filedistribution_common
-)
-target_compile_options(filedistribution_status-filedistribution_app PRIVATE -DTORRENT_DISABLE_ENCRYPTION -DTORRENT_DISABLE_DHT -DWITH_SHIPPED_GEOIP_H -DBOOST_ASIO_HASH_MAP_BUCKETS=1021 -DBOOST_EXCEPTION_DISABLE -DBOOST_ASIO_ENABLE_CANCELIO -DBOOST_ASIO_DYN_LINK -DTORRENT_LINKING_SHARED)
-vespa_add_target_system_dependency(filedistribution_status-filedistribution_app boost boost_system${VESPA_BOOST_LIB_SUFFIX})
-vespa_add_target_system_dependency(filedistribution_status-filedistribution_app boost boost_thread${VESPA_BOOST_LIB_SUFFIX})
-vespa_add_target_system_dependency(filedistribution_status-filedistribution_app boost boost_program_options${VESPA_BOOST_LIB_SUFFIX})
-vespa_add_target_system_dependency(filedistribution_status-filedistribution_app boost boost_filesystem${VESPA_BOOST_LIB_SUFFIX})
-vespa_install_script(vespa-status-filedistribution.sh vespa-status-filedistribution bin)
diff --git a/filedistribution/src/apps/status/status-filedistribution.cpp b/filedistribution/src/apps/status/status-filedistribution.cpp
deleted file mode 100644
index 52cabdfcb2d..00000000000
--- a/filedistribution/src/apps/status/status-filedistribution.cpp
+++ /dev/null
@@ -1,180 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include <vespa/filedistribution/model/zkfacade.h>
-#include <vespa/filedistribution/model/filedistributionmodel.h>
-#include <vespa/filedistribution/model/filedistributionmodelimpl.h>
-#include <zookeeper/zookeeper.h>
-#include <boost/program_options.hpp>
-
-#include <vespa/log/log.h>
-#include <iostream>
-#include <map>
-#include <thread>
-
-LOG_SETUP("status-filedistribution");
-
-using namespace filedistribution;
-using namespace std::literals;
-namespace po = boost::program_options;
-
-std::string
-plural(size_t size)
-{
- if (size == 1)
- return "";
- else
- return "s";
-}
-
-template <class CONT>
-std::string
-plural(const CONT& cont)
-{
- size_t size = cont.size();
- return plural(size);
-}
-
-typedef FileDBModel::HostStatus HostStatus;
-typedef std::map<std::string, HostStatus> StatusByHostName;
-
-void
-printWaitingForHosts(const StatusByHostName& notFinishedHosts)
-{
- std::cout <<"Waiting for the following host" <<plural(notFinishedHosts) <<":" <<std::endl;
- for (const StatusByHostName::value_type & hostNameAndStatus : notFinishedHosts) {
- std::cout <<hostNameAndStatus.first <<" (";
-
- const HostStatus& hostStatus = hostNameAndStatus.second;
- if (hostStatus._state == HostStatus::notStarted) {
- std::cout <<"Not started";
- } else {
- std::cout <<"Downloading, "
- <<hostStatus._numFilesFinished <<"/" <<hostStatus._numFilesToDownload
- <<" file" <<plural(hostStatus._numFilesToDownload) <<" completed";
- }
- std::cout <<")" <<std::endl;
- }
-}
-
-//TODO:refactor
-int printStatus(const std::string& zkservers)
-{
- std::shared_ptr<ZKFacade> zk(new ZKFacade(zkservers, true));
-
- std::shared_ptr<FileDBModel> model(new ZKFileDBModel(zk));
-
- std::vector<std::string> hosts = model->getHosts();
-
- StatusByHostName notFinishedHosts;
- StatusByHostName finishedHosts;
- bool hasStarted = false;
-
- for (const std::string & host : hosts) {
- HostStatus hostStatus = model->getHostStatus(host);
- switch (hostStatus._state) {
- case HostStatus::finished:
- hasStarted = true;
- finishedHosts[host] = hostStatus;
- break;
- case HostStatus::inProgress:
- hasStarted = true;
- //@fallthrough@
- case HostStatus::notStarted:
- notFinishedHosts[host] = hostStatus;
- break;
- }
- }
-
- if (notFinishedHosts.empty()) {
- std::cout <<"Finished distributing files to all hosts." <<std::endl;
- return 0;
- } else if (!hasStarted) {
- std::cout <<"File distribution has not yet started." <<std::endl;
- return 0;
- } else {
- printWaitingForHosts(notFinishedHosts);
- return 5;
- }
-}
-
-int
-printStatusRetryIfZKProblem(const std::string& zkservers, const std::string& zkLogFile)
-{
- FILE* file = fopen(zkLogFile.c_str(), "w");
- if (file == NULL) {
- std::cerr <<"Could not open file " <<zkLogFile <<std::endl;
- } else {
- zoo_set_log_stream(file);
- }
- zoo_set_debug_level(ZOO_LOG_LEVEL_ERROR);
-
- for (int i = 0; i < 5; ++i) {
- try {
- return printStatus(zkservers);
- } catch (ZKNodeDoesNotExistsException& e) {
- LOG(debug, "Node does not exists, assuming concurrent update. %s", e.what());
-
- } catch (ZKSessionExpired& e) {
- LOG(debug, "Session expired.");
- }
- std::this_thread::sleep_for(500ms);
- }
- return 4;
-}
-
-
-//TODO: move to common
-struct ProgramOptionException {
- std::string _msg;
- ProgramOptionException(const std::string & msg)
- : _msg(msg)
- {}
-};
-
-bool exists(const std::string& optionName, const po::variables_map& map) {
- return map.find(optionName) != map.end();
-}
-
-void ensureExists(const std::string& optionName, const po::variables_map& map) {
- if (!exists(optionName, map)) {
- throw ProgramOptionException("Error: Missing option " + optionName);
- }
-}
-//END: move to common
-
-
-int
-main(int argc, char** argv) {
- const char
- *zkstring = "zkstring",
- *zkLogFile = "zkLogFile",
- *help = "help";
-
- po::options_description description;
- description.add_options()
- (zkstring, po::value<std::string > (), "The zookeeper servers to connect to, separated by comma")
- (zkLogFile, po::value<std::string >() -> default_value("/dev/null"), "Zookeeper log file")
- (help, "help");
-
- try {
- po::variables_map values;
- po::store(po::parse_command_line(argc, argv, description), values);
-
- if (exists(help, values)) {
- std::cout <<description;
- return 0;
- }
-
- ensureExists(zkstring, values);
-
- return printStatusRetryIfZKProblem(
- values[zkstring] .as<std::string>(),
- values[zkLogFile].as<std::string>());
- } catch (const ProgramOptionException& e) {
- std::cerr <<e._msg <<std::endl;
- return 3;
- } catch(const std::exception& e) {
- std::cerr <<"Error: " <<e.what() <<std::endl;
- return 3;
- }
-}
diff --git a/filedistribution/src/apps/status/vespa-status-filedistribution.sh b/filedistribution/src/main/sh/vespa-status-filedistribution.sh
index 65a6da89b56..119999c9191 100644
--- a/filedistribution/src/apps/status/vespa-status-filedistribution.sh
+++ b/filedistribution/src/main/sh/vespa-status-filedistribution.sh
@@ -60,22 +60,15 @@ findroot
ROOT=${VESPA_HOME%/}
-if [ "$cloudconfig_server__disable_filedistributor" = "" ] || [ "$cloudconfig_server__disable_filedistributor" != "true" ]; then
- ZKSTRING=$($ROOT/libexec/vespa/vespa-config.pl -zkstring)
- test -z "$VESPA_LOG_LEVEL" && VESPA_LOG_LEVEL=warning
- export VESPA_LOG_LEVEL
- exec $ROOT/bin/vespa-status-filedistribution-bin --zkstring "$ZKSTRING" $@
-else
- if [ "$cloudconfig_server__environment" != "" ]; then
- environment="--environment $cloudconfig_server__environment"
- fi
- if [ "$cloudconfig_server__region" != "" ]; then
- region="--region $cloudconfig_server__region"
- fi
+if [ "$cloudconfig_server__environment" != "" ]; then
+ environment="--environment $cloudconfig_server__environment"
+fi
+if [ "$cloudconfig_server__region" != "" ]; then
+ region="--region $cloudconfig_server__region"
+fi
- defaults="--tenant default --application default --instance default"
- jvmoptions="-XX:MaxJavaStackTraceDepth=-1 $(getJavaOptionsIPV46) -Xms48m -Xmx48m"
- jar="-cp $VESPA_HOME/lib/jars/filedistribution-jar-with-dependencies.jar"
+defaults="--tenant default --application default --instance default"
+jvmoptions="-XX:MaxJavaStackTraceDepth=-1 $(getJavaOptionsIPV46) -Xms48m -Xmx48m"
+jar="-cp $VESPA_HOME/lib/jars/filedistribution-jar-with-dependencies.jar"
- exec java $jvmoptions $jar com.yahoo.vespa.filedistribution.status.FileDistributionStatusClient $defaults $environment $region "$@"
-fi
+exec java $jvmoptions $jar com.yahoo.vespa.filedistribution.status.FileDistributionStatusClient $defaults $environment $region "$@"
diff --git a/filedistribution/src/tests/.gitignore b/filedistribution/src/tests/.gitignore
deleted file mode 100644
index ee50e13466c..00000000000
--- a/filedistribution/src/tests/.gitignore
+++ /dev/null
@@ -1,2 +0,0 @@
-Makefile
-*_test
diff --git a/filedistribution/src/tests/common/.gitignore b/filedistribution/src/tests/common/.gitignore
deleted file mode 100644
index 060721ea295..00000000000
--- a/filedistribution/src/tests/common/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-filedistribution_common_test_app
diff --git a/filedistribution/src/tests/common/CMakeLists.txt b/filedistribution/src/tests/common/CMakeLists.txt
deleted file mode 100644
index 9f142eec9e7..00000000000
--- a/filedistribution/src/tests/common/CMakeLists.txt
+++ /dev/null
@@ -1,13 +0,0 @@
-# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-vespa_add_executable(filedistribution_common_test_app TEST
- SOURCES
- testCommon.cpp
- DEPENDS
-)
-target_compile_options(filedistribution_common_test_app PRIVATE -DTORRENT_DISABLE_ENCRYPTION -DTORRENT_DISABLE_DHT -DWITH_SHIPPED_GEOIP_H -DBOOST_ASIO_HASH_MAP_BUCKETS=1021 -DBOOST_EXCEPTION_DISABLE -DBOOST_ASIO_ENABLE_CANCELIO -DBOOST_ASIO_DYN_LINK -DTORRENT_LINKING_SHARED)
-vespa_add_target_system_dependency(filedistribution_common_test_app boost boost_system${VESPA_BOOST_LIB_SUFFIX})
-vespa_add_target_system_dependency(filedistribution_common_test_app boost boost_thread${VESPA_BOOST_LIB_SUFFIX})
-vespa_add_target_system_dependency(filedistribution_common_test_app boost boost_program_options${VESPA_BOOST_LIB_SUFFIX})
-vespa_add_target_system_dependency(filedistribution_common_test_app boost boost_filesystem${VESPA_BOOST_LIB_SUFFIX})
-vespa_add_target_system_dependency(filedistribution_common_test_app boost boost_unit_test_framework${VESPA_BOOST_LIB_SUFFIX})
-vespa_add_test(NAME filedistribution_common_test_app NO_VALGRIND COMMAND filedistribution_common_test_app)
diff --git a/filedistribution/src/tests/common/testCommon.cpp b/filedistribution/src/tests/common/testCommon.cpp
deleted file mode 100644
index 8ad17b6b7ce..00000000000
--- a/filedistribution/src/tests/common/testCommon.cpp
+++ /dev/null
@@ -1,41 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#define BOOST_TEST_DYN_LINK
-#define BOOST_TEST_MAIN
-
-#include <vespa/filedistribution/common/buffer.h>
-
-#include <boost/test/unit_test.hpp>
-#include <string>
-
-namespace fd = filedistribution;
-
-const size_t bufferCapacity = 10;
-
-fd::Buffer
-getBuffer() {
- const char* test = "test";
- fd::Buffer buffer(test, test + strlen(test));
- buffer.reserve(bufferCapacity);
- buffer.push_back(0);
- return buffer;
-}
-
-BOOST_AUTO_TEST_CASE(bufferTest) {
- fd::Buffer buffer(getBuffer());
- BOOST_CHECK(buffer.begin() != 0);
- BOOST_CHECK_EQUAL(bufferCapacity, buffer.capacity());
- BOOST_CHECK_EQUAL(5u, buffer.size());
- BOOST_CHECK_EQUAL(std::string("test"), buffer.begin());
-}
-
-struct Callback {
- bool* _called;
- Callback(bool *called)
- :_called(called)
- {}
-
- void operator()(const std::string& str) {
- BOOST_CHECK_EQUAL("abcd", str);
- *_called = true;
- }
-};
diff --git a/filedistribution/src/tests/filedbmodelimpl/.gitignore b/filedistribution/src/tests/filedbmodelimpl/.gitignore
deleted file mode 100644
index 05a25df1258..00000000000
--- a/filedistribution/src/tests/filedbmodelimpl/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-filedistribution_filedbmodelimpl_test_app
diff --git a/filedistribution/src/tests/filedbmodelimpl/CMakeLists.txt b/filedistribution/src/tests/filedbmodelimpl/CMakeLists.txt
deleted file mode 100644
index a97cc9e4c7a..00000000000
--- a/filedistribution/src/tests/filedbmodelimpl/CMakeLists.txt
+++ /dev/null
@@ -1,16 +0,0 @@
-# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-vespa_add_executable(filedistribution_filedbmodelimpl_test_app TEST
- SOURCES
- test-filedistributionmodelimpl.cpp
- DEPENDS
- filedistribution_filedistributionmodel
- filedistribution_common
- filedistribution_mocks
-)
-target_compile_options(filedistribution_filedbmodelimpl_test_app PRIVATE -DTORRENT_DISABLE_ENCRYPTION -DTORRENT_DISABLE_DHT -DWITH_SHIPPED_GEOIP_H -DBOOST_ASIO_HASH_MAP_BUCKETS=1021 -DBOOST_EXCEPTION_DISABLE -DBOOST_ASIO_ENABLE_CANCELIO -DBOOST_ASIO_DYN_LINK -DTORRENT_LINKING_SHARED)
-vespa_add_target_system_dependency(filedistribution_filedbmodelimpl_test_app boost boost_system${VESPA_BOOST_LIB_SUFFIX})
-vespa_add_target_system_dependency(filedistribution_filedbmodelimpl_test_app boost boost_thread${VESPA_BOOST_LIB_SUFFIX})
-vespa_add_target_system_dependency(filedistribution_filedbmodelimpl_test_app boost boost_program_options${VESPA_BOOST_LIB_SUFFIX})
-vespa_add_target_system_dependency(filedistribution_filedbmodelimpl_test_app boost boost_filesystem${VESPA_BOOST_LIB_SUFFIX})
-vespa_add_target_system_dependency(filedistribution_filedbmodelimpl_test_app boost boost_unit_test_framework${VESPA_BOOST_LIB_SUFFIX})
-vespa_add_test(NAME filedistribution_filedbmodelimpl_test_app NO_VALGRIND COMMAND filedistribution_filedbmodelimpl_test_app)
diff --git a/filedistribution/src/tests/filedbmodelimpl/test-filedistributionmodelimpl.cpp b/filedistribution/src/tests/filedbmodelimpl/test-filedistributionmodelimpl.cpp
deleted file mode 100644
index 1de7fc817ae..00000000000
--- a/filedistribution/src/tests/filedbmodelimpl/test-filedistributionmodelimpl.cpp
+++ /dev/null
@@ -1,51 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#define BOOST_TEST_DYN_LINK
-#define BOOST_TEST_MAIN
-#define BOOST_TEST_MODULE filedbmodelimpl test
-#include <boost/test/unit_test.hpp>
-
-#include <iostream>
-#include <vector>
-#include <vespa/filedistribution/common/componentsdeleter.h>
-#include <vespa/filedistribution/model/filedistributionmodelimpl.h>
-#include <vespa/filedistribution/model/zkfacade.h>
-#include <zookeeper/zookeeper.h>
-
-
-using namespace filedistribution;
-
-
-namespace {
-
-
-struct Fixture {
- ComponentsDeleter _componentsDeleter;
- std::shared_ptr<ZKFacade> _zk;
- std::shared_ptr<FileDistributionModelImpl> _distModel;
- Fixture() {
- _zk = _componentsDeleter.track(new ZKFacade("test1-tonyv:2181", false));
- _distModel.reset(new FileDistributionModelImpl("hostname", 12345, _zk));
- }
- ~Fixture() { }
-};
-
-} //anonymous namespace
-
-
-BOOST_FIXTURE_TEST_SUITE(FileDistributionModelImplTests, Fixture)
-
-BOOST_AUTO_TEST_CASE(configServersAsPeers)
-{
- std::vector<std::string> peers;
- peers.push_back("old");
- peers.push_back("config:123");
- peers.push_back("config:567");
- peers.push_back("foo:123");
- _distModel->addConfigServersAsPeers(peers, "config,configTwo", 123);
- BOOST_CHECK(peers.size() == 5);
- BOOST_CHECK(peers[4] == "configTwo:123");
- _distModel->addConfigServersAsPeers(peers, NULL, 123);
- BOOST_CHECK(peers.size() == 5);
-}
-
-BOOST_AUTO_TEST_SUITE_END()
diff --git a/filedistribution/src/tests/filedownloader/.gitignore b/filedistribution/src/tests/filedownloader/.gitignore
deleted file mode 100644
index f7bb72b4791..00000000000
--- a/filedistribution/src/tests/filedownloader/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-filedistribution_filedownloader_test_app
diff --git a/filedistribution/src/tests/filedownloader/CMakeLists.txt b/filedistribution/src/tests/filedownloader/CMakeLists.txt
deleted file mode 100644
index 7bdfeda3525..00000000000
--- a/filedistribution/src/tests/filedownloader/CMakeLists.txt
+++ /dev/null
@@ -1,15 +0,0 @@
-# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-vespa_add_executable(filedistribution_filedownloader_test_app TEST
- SOURCES
- testfiledownloader.cpp
- DEPENDS
- filedistribution_filedistributionmodel
- filedistribution_common
-)
-target_compile_options(filedistribution_filedownloader_test_app PRIVATE -DTORRENT_DISABLE_ENCRYPTION -DTORRENT_DISABLE_DHT -DWITH_SHIPPED_GEOIP_H -DBOOST_ASIO_HASH_MAP_BUCKETS=1021 -DBOOST_EXCEPTION_DISABLE -DBOOST_ASIO_ENABLE_CANCELIO -DBOOST_ASIO_DYN_LINK -DTORRENT_LINKING_SHARED)
-vespa_add_target_system_dependency(filedistribution_filedownloader_test_app boost boost_system${VESPA_BOOST_LIB_SUFFIX})
-vespa_add_target_system_dependency(filedistribution_filedownloader_test_app boost boost_thread${VESPA_BOOST_LIB_SUFFIX})
-vespa_add_target_system_dependency(filedistribution_filedownloader_test_app boost boost_program_options${VESPA_BOOST_LIB_SUFFIX})
-vespa_add_target_system_dependency(filedistribution_filedownloader_test_app boost boost_filesystem${VESPA_BOOST_LIB_SUFFIX})
-vespa_add_target_system_dependency(filedistribution_filedownloader_test_app boost boost_unit_test_framework${VESPA_BOOST_LIB_SUFFIX})
-vespa_add_test(NAME filedistribution_filedownloader_test_app NO_VALGRIND COMMAND filedistribution_filedownloader_test_app)
diff --git a/filedistribution/src/tests/filedownloader/testfiledownloader.cpp b/filedistribution/src/tests/filedownloader/testfiledownloader.cpp
deleted file mode 100644
index c03c6ca5e6c..00000000000
--- a/filedistribution/src/tests/filedownloader/testfiledownloader.cpp
+++ /dev/null
@@ -1,143 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#define BOOST_TEST_DYN_LINK
-#define BOOST_TEST_MAIN
-
-#include <vespa/filedistribution/distributor/filedownloader.h>
-#include <vespa/filedistribution/distributor/filedistributortrackerimpl.h>
-
-#include <fstream>
-
-#include <boost/test/unit_test.hpp>
-#include <boost/filesystem.hpp>
-#include <boost/filesystem/fstream.hpp>
-
-#include <vespa/filedistribution/manager/createtorrent.h>
-#include <vespa/filedistribution/common/componentsdeleter.h>
-
-namespace fs = boost::filesystem;
-
-using namespace filedistribution;
-
-namespace {
-const std::string localHost("localhost");
-const int uploaderPort = 9113;
-const int downloaderPort = 9112;
-
-#if 0
-std::shared_ptr<FileDownloader>
-createDownloader(ComponentsDeleter& deleter,
- int port, const fs::path& downloaderPath,
- const std::shared_ptr<FileDistributionModel>& model)
-{
- std::shared_ptr<FileDistributorTrackerImpl> tracker(deleter.track(new FileDistributorTrackerImpl(model)));
- std::shared_ptr<FileDownloader> downloader(deleter.track(new FileDownloader(tracker,
- localHost, port, downloaderPath)));
-
- tracker->setDownloader(downloader);
- return downloader;
-}
-#endif
-
-} //anonymous namespace
-
-class MockFileDistributionModel : public FileDistributionModel {
- FileDBModel& getFileDBModel() override {
- abort();
- }
-
- std::set<std::string> getFilesToDownload() override {
- return std::set<std::string>();
- }
-
- PeerEntries getPeers(const std::string& , size_t) override {
- PeerEntries peers(2);
- peers[0].ip = localHost;
- peers[0].port = uploaderPort;
-
- peers[1].ip = localHost;
- peers[1].port = downloaderPort;
-
- return peers;
- }
-
- void addPeer(const std::string&) override {}
- void removePeer(const std::string&) override {}
- void peerFinished(const std::string&) override {}
-};
-
-
-#if 0
-BOOST_AUTO_TEST_CASE(fileDownloaderTest) {
- fs::path testPath = "/tmp/filedownloadertest";
- fs::remove_all(testPath);
-
- fs::path downloaderPath = testPath / "downloader";
- fs::path uploaderPath = testPath / "uploader";
-
- const std::string fileReference = "0123456789012345678901234567890123456789";
- const std::string fileToSend = "filetosend.txt";
-
- fs::create_directories(downloaderPath);
- fs::create_directories(uploaderPath / fileReference);
-
- fs::path fileToUploadPath = uploaderPath / fileReference / "filetosend.txt";
-
- {
- fs::ofstream stream(fileToUploadPath);
- stream <<"Hello, world!" <<std::endl;
- }
-
- CreateTorrent createTorrent(fileToUploadPath);
- Buffer buffer(createTorrent.bencode());
-
- ComponentsDeleter deleter;
-
- std::shared_ptr<FileDistributionModel> model(deleter.track(new MockFileDistributionModel()));
- std::shared_ptr<FileDownloader> downloader =
- createDownloader(deleter, downloaderPort, downloaderPath, model);
-
- std::shared_ptr<FileDownloader> uploader =
- createDownloader(deleter, uploaderPort, uploaderPath, model);
-
- std::thread uploaderThread( [uploader] () { uploader->runEventLoop(); });
- std::thread downloaderThread( [downloader] () { downloader->runEventLoop(); });
-
- uploader->addTorrent(fileReference, buffer);
- downloader->addTorrent(fileReference, buffer);
-
- sleep(5);
- BOOST_CHECK(fs::exists(downloaderPath / fileReference / fileToSend));
-
- uploaderThread.interrupt();
- uploaderThread.join();
-
- downloaderThread.interrupt();
- downloaderThread.join();
-
- fs::remove_all(testPath);
-}
-#endif
-
-//TODO: cleanup
-libtorrent::sha1_hash
-toInfoHash(const std::string& fileReference) {
- assert (fileReference.size() == 40);
- std::istringstream s(fileReference);
-
- libtorrent::sha1_hash infoHash;
- s >> infoHash;
- return infoHash;
-}
-
-BOOST_AUTO_TEST_CASE(test_filereference_infohash_conversion) {
- const std::string fileReference = "3a281c905c9b6ebe4d969037a198454fedefbdf3";
-
- libtorrent::sha1_hash infoHash = toInfoHash(fileReference);
-
- std::ostringstream fileReferenceString;
- fileReferenceString <<infoHash;
-
- BOOST_CHECK(fileReference == fileReferenceString.str());
-
- std::cout <<fileReference <<std::endl <<fileReferenceString.str() <<std::endl;
-}
diff --git a/filedistribution/src/tests/lib/CMakeLists.txt b/filedistribution/src/tests/lib/CMakeLists.txt
deleted file mode 100644
index 735a735e1ba..00000000000
--- a/filedistribution/src/tests/lib/CMakeLists.txt
+++ /dev/null
@@ -1,6 +0,0 @@
-# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-vespa_add_library(filedistribution_mocks STATIC
- SOURCES
- mock-zookeeper.cpp
- DEPENDS
-)
diff --git a/filedistribution/src/tests/lib/mock-zookeeper.cpp b/filedistribution/src/tests/lib/mock-zookeeper.cpp
deleted file mode 100644
index 5416afdc1fe..00000000000
--- a/filedistribution/src/tests/lib/mock-zookeeper.cpp
+++ /dev/null
@@ -1,327 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <zookeeper/zookeeper.h>
-
-#include <string>
-#include <map>
-#include <cassert>
-#include <cstring>
-#include <vector>
-
-#include <thread>
-#include <atomic>
-#include <boost/lexical_cast.hpp>
-
-#include <iostream>
-
-#include <vespa/filedistribution/common/concurrentqueue.h>
-
-using std::map;
-using std::string;
-using std::vector;
-using std::pair;
-using std::make_pair;
-using filedistribution::ConcurrentQueue;
-
-namespace {
-std::pair<string, string> parentPathAndChildName(const string& childPath)
-{
- if (childPath.empty()) {
- return std::make_pair("", "");
- } else {
- assert (childPath[0] == '/');
-
- size_t index = childPath.find_last_of("/");
- return std::make_pair(childPath.substr(0, index), childPath.substr(index + 1));
- }
-}
-
-struct Node {
- typedef map<string, Node> Children;
- Children children;
- bool exists;
- bool ephemeral;
- vector<char> buffer;
- vector<pair<watcher_fn, void*> > watchers;
-
- Node()
- :exists(false),
- ephemeral(false)
- {}
-
- void addWatcher(watcher_fn fn, void* context) {
- if (fn)
- watchers.push_back(make_pair(fn, context));
- }
-
- void triggerWatches(zhandle_t* zh, const std::string& path);
-};
-
-std::shared_ptr<Node> sharedRoot;
-
-void doNothing() { }
-
-struct ZHandle {
- struct Worker {
- ZHandle& zhandle;
-
- Worker(ZHandle* parent) : zhandle(*parent) {}
-
- void operator()();
- };
-
- int sequence;
-
- std::shared_ptr<Node> root;
- std::atomic<bool> _closed;
- std::thread _watchersThread;
- vector<string> ephemeralNodes;
-
- typedef std::function<void (void)> InvokeWatcherFun;
- ConcurrentQueue<InvokeWatcherFun> watcherInvocations;
-
- Node& getNode(const string& path);
-
- Node& getParent(const string& path);
-
- void ephemeralNode(const string&path) {
- ephemeralNodes.push_back(path);
- }
-
- ZHandle() : sequence(0), _closed(false), _watchersThread(Worker(this)) {
- if (!sharedRoot)
- sharedRoot.reset(new Node());
-
- root = sharedRoot;
- }
-
- ~ZHandle() {
- std::for_each(ephemeralNodes.begin(), ephemeralNodes.end(),
- [this] (const string & s) { zoo_delete((zhandle_t*)this, s.c_str(), 0); });
- close();
- _watchersThread.join();
- }
- void close() {
- _closed.store(true);
- watcherInvocations.push(std::ref(doNothing));
- }
-};
-
-void
-ZHandle::Worker::operator()()
-{
- while (! zhandle._closed.load()) {
- InvokeWatcherFun fun = zhandle.watcherInvocations.pop();
- fun();
- }
-}
-
-Node& ZHandle::getNode(const string& path) {
- auto splittedPath = parentPathAndChildName(path);
- if (splittedPath.second.empty()) {
- return *root;
- } else {
- return getNode(splittedPath.first).children[splittedPath.second];
- }
-}
-
-Node&
-ZHandle::getParent(const string& childPath)
-{
- auto splittedPath = parentPathAndChildName(childPath);
- if (splittedPath.second.empty()) {
- throw "Can't get parent of root.";
- } else {
- return getNode(splittedPath.first);
- }
-}
-
-void
-Node::triggerWatches(zhandle_t* zh, const std::string& path) {
- for (auto i = watchers.begin(); i != watchers.end(); ++i) {
- ((ZHandle*)zh)->watcherInvocations.push([zh, i, path] () { i->first(zh, 0, 0, path.c_str(), i->second); });
- }
- watchers.clear();
-}
-
-} //anonymous namespace
-
-extern "C" {
-
-ZOOAPI void zoo_set_debug_level(ZooLogLevel) {}
-ZOOAPI zhandle_t *zookeeper_init(const char * host, watcher_fn fn,
- int recv_timeout, const clientid_t *clientid, void *context, int flags)
-{
- (void)host;
- (void)fn;
- (void)recv_timeout;
- (void)clientid;
- (void)context;
- (void)flags;
-
- return (zhandle_t*)new ZHandle;
-}
-
-ZOOAPI int zookeeper_close(zhandle_t *zh)
-{
- delete (ZHandle*)zh;
- return 0;
-}
-
-ZOOAPI int zoo_create(zhandle_t *zh, const char *pathOrPrefix, const char *value,
- int valuelen, const struct ACL_vector *, int flags,
- char *path_buffer, int path_buffer_len)
-{
- std::string path = pathOrPrefix;
- if (flags & ZOO_SEQUENCE)
- path += boost::lexical_cast<std::string>(((ZHandle*)zh)->sequence++);
-
- strncpy(path_buffer, path.c_str(), path_buffer_len);
- Node& node = ((ZHandle*)zh)->getNode(path);
- node.exists = true;
-
- if (flags & ZOO_EPHEMERAL)
- ((ZHandle*)zh)->ephemeralNode(path);
-
- node.buffer.resize(valuelen);
- std::copy(value, value + valuelen, node.buffer.begin());
-
-
- node.triggerWatches(zh, path);
- ((ZHandle*)zh)->getParent(path).triggerWatches(zh,
- parentPathAndChildName(path).first);
-
- return 0;
-}
-
-
-ZOOAPI int zoo_set(zhandle_t *zh, const char *path, const char *buffer,
- int buflen, int version) {
- (void)version;
-
- Node& node = ((ZHandle*)zh)->getNode(path);
- if (!node.exists)
- return ZNONODE;
-
-
- node.buffer.resize(buflen);
- std::copy(buffer, buffer + buflen, node.buffer.begin());
-
- node.triggerWatches(zh, path);
- return 0;
-}
-
-
-
-ZOOAPI int zoo_get_children(zhandle_t *zh, const char *path, int watch,
- struct String_vector *strings)
-{
- (void)watch;
- return zoo_wget_children(zh, path,
- 0, 0,
- strings);
-}
-
-ZOOAPI int zoo_wget_children(zhandle_t *zh, const char *path,
- watcher_fn watcher, void* watcherCtx,
- struct String_vector *strings)
-{
- Node& node = ((ZHandle*)zh)->getNode(path);
- strings->count = node.children.size();
- strings->data = new char*[strings->count];
-
- int index = 0;
- for (auto i = node.children.begin(); i != node.children.end(); ++i) {
- strings->data[index] = new char[i->first.length() + 1];
- std::strcpy(strings->data[index], &*i->first.begin());
- ++index;
- }
-
- node.addWatcher(watcher, watcherCtx);
-
- return 0;
-}
-
-
-
-
-ZOOAPI int zoo_delete(zhandle_t *zh, const char *path, int version)
-{
- (void)version;
-
- std::string pathStr = path;
- int index = pathStr.find_last_of("/");
-
- if (pathStr.length() == 1)
- throw "Can't delete root";
-
- Node& parent = ((ZHandle*)zh)->getNode(pathStr.substr(0, index));
- parent.children.erase(pathStr.substr(index + 1));
-
- ((ZHandle*)zh)->getParent(path).triggerWatches(zh,
- parentPathAndChildName(path).first);
-
- return 0;
-}
-
-void zoo_set_log_stream(FILE*) {}
-
-int deallocate_String_vector(struct String_vector *v) {
- for (int i=0; i< v->count; ++i) {
- delete[] v->data[i];
- }
- delete[] v->data;
- return 0;
-}
-
-
-ZOOAPI int zoo_get(zhandle_t *zh, const char *path, int watch, char *buffer,
- int* buffer_len, struct Stat *stat)
-{
- (void)watch;
-
- return zoo_wget(zh, path,
- 0, 0,
- buffer, buffer_len, stat);
-
-}
-
-ZOOAPI int zoo_wget(zhandle_t *zh, const char *path,
- watcher_fn watcher, void* watcherCtx,
- char *buffer, int* buffer_len, struct Stat *)
-{
- Node& node = ((ZHandle*)zh)->getNode(path);
- std::copy(node.buffer.begin(), node.buffer.end(), buffer);
- *buffer_len = node.buffer.size();
-
- node.addWatcher(watcher, watcherCtx);
- return 0;
-}
-
-ZOOAPI int zoo_wexists(zhandle_t *zh, const char *path,
- watcher_fn watcher, void* watcherCtx, struct Stat *)
-{
- Node& node = ((ZHandle*)zh)->getNode(path);
-
- node.addWatcher(watcher, watcherCtx);
- return node.exists ? ZOK : ZNONODE;
-}
-
-ZOOAPI int zoo_exists(zhandle_t *zh, const char *path, int watch, struct Stat *stat)
-{
- (void)watch;
- return zoo_wexists(zh, path,
- 0, 0,
- stat);
-}
-
-
-
-
-ZOOAPI ACL_vector ZOO_OPEN_ACL_UNSAFE;
-
-ZOOAPI const int ZOO_SEQUENCE = 1;
-ZOOAPI const int ZOO_EPHEMERAL = 2;
-ZOOAPI const int ZOO_SESSION_EVENT = 3;
-ZOOAPI const int ZOO_EXPIRED_SESSION_STATE = 4;
-ZOOAPI const int ZOO_AUTH_FAILED_STATE = 5;
-}
diff --git a/filedistribution/src/tests/rpc/.gitignore b/filedistribution/src/tests/rpc/.gitignore
deleted file mode 100644
index b29a47efd87..00000000000
--- a/filedistribution/src/tests/rpc/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-filedistribution_rpc_test_app
diff --git a/filedistribution/src/tests/rpc/CMakeLists.txt b/filedistribution/src/tests/rpc/CMakeLists.txt
deleted file mode 100644
index 1882959f94f..00000000000
--- a/filedistribution/src/tests/rpc/CMakeLists.txt
+++ /dev/null
@@ -1,15 +0,0 @@
-# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-vespa_add_executable(filedistribution_rpc_test_app TEST
- SOURCES
- testfileprovider.cpp
- DEPENDS
- filedistribution_filedistributorrpc
- filedistribution_common
-)
-target_compile_options(filedistribution_rpc_test_app PRIVATE -DTORRENT_DISABLE_ENCRYPTION -DTORRENT_DISABLE_DHT -DWITH_SHIPPED_GEOIP_H -DBOOST_ASIO_HASH_MAP_BUCKETS=1021 -DBOOST_EXCEPTION_DISABLE -DBOOST_ASIO_ENABLE_CANCELIO -DBOOST_ASIO_DYN_LINK -DTORRENT_LINKING_SHARED)
-vespa_add_target_system_dependency(filedistribution_rpc_test_app boost boost_system${VESPA_BOOST_LIB_SUFFIX})
-vespa_add_target_system_dependency(filedistribution_rpc_test_app boost boost_thread${VESPA_BOOST_LIB_SUFFIX})
-vespa_add_target_system_dependency(filedistribution_rpc_test_app boost boost_program_options${VESPA_BOOST_LIB_SUFFIX})
-vespa_add_target_system_dependency(filedistribution_rpc_test_app boost boost_filesystem${VESPA_BOOST_LIB_SUFFIX})
-vespa_add_target_system_dependency(filedistribution_rpc_test_app boost boost_unit_test_framework${VESPA_BOOST_LIB_SUFFIX})
-vespa_add_test(NAME filedistribution_rpc_test_app NO_VALGRIND COMMAND filedistribution_rpc_test_app)
diff --git a/filedistribution/src/tests/rpc/mockfileprovider.h b/filedistribution/src/tests/rpc/mockfileprovider.h
deleted file mode 100644
index be0a6100165..00000000000
--- a/filedistribution/src/tests/rpc/mockfileprovider.h
+++ /dev/null
@@ -1,49 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#pragma once
-
-#include <vespa/filedistribution/rpc/fileprovider.h>
-#include <boost/thread/barrier.hpp>
-
-namespace filedistribution {
-
-class MockFileProvider : public FileProvider {
- DownloadCompletedSignal _downloadCompleted;
- DownloadFailedSignal _downloadFailed;
-public:
- static const std::string _queueForeverFileReference;
-
- boost::barrier _queueForeverBarrier;
-
- boost::optional<Path> getPath(const std::string& fileReference) override {
- if (fileReference == "dd") {
- return Path("direct/result/path");
- } else {
- return boost::optional<Path>();
- }
- }
-
- void downloadFile(const std::string& fileReference) override {
- if (fileReference == _queueForeverFileReference) {
- _queueForeverBarrier.wait();
- return;
- }
-
- sleep(1);
- downloadCompleted()(fileReference, "downloaded/path/" + fileReference);
- }
-
- DownloadCompletedSignal& downloadCompleted() override {
- return _downloadCompleted;
- }
-
- DownloadFailedSignal& downloadFailed() override {
- return _downloadFailed;
- }
-
- MockFileProvider()
- :_queueForeverBarrier(2)
- {}
-};
-
-} //namespace filedistribution
-
diff --git a/filedistribution/src/tests/rpc/testfileprovider.cpp b/filedistribution/src/tests/rpc/testfileprovider.cpp
deleted file mode 100644
index 2879258272a..00000000000
--- a/filedistribution/src/tests/rpc/testfileprovider.cpp
+++ /dev/null
@@ -1,66 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#define BOOST_TEST_DYN_LINK
-#define BOOST_TEST_MAIN
-
-#include "mockfileprovider.h"
-#include <vespa/filedistribution/rpc/filedistributorrpc.h>
-#include <vespa/frtstream/frtclientstream.h>
-#include <vespa/fnet/frt/rpcrequest.h>
-#include <vespa/fnet/frt/target.h>
-#include <boost/test/unit_test.hpp>
-
-namespace fd = filedistribution;
-
-using fd::MockFileProvider;
-
-const std::string MockFileProvider::_queueForeverFileReference("queue-forever");
-
-BOOST_AUTO_TEST_CASE(fileDistributionRPCTest) {
- const std::string spec("tcp/localhost:9111");
- fd::FileProvider::SP provider(new fd::MockFileProvider());
- fd::FileDistributorRPC::SP fileDistributorRPC(new fd::FileDistributorRPC(spec, provider));
- fileDistributorRPC->start();
-
- frtstream::FrtClientStream rpc(spec);
- frtstream::Method method("waitFor");
-
- std::string path;
- rpc <<method <<"dd";
- rpc >> path;
- BOOST_CHECK_EQUAL("direct/result/path", path);
-
- rpc <<method <<"0123456789abcdef";
- rpc >> path;
- BOOST_CHECK_EQUAL("downloaded/path/0123456789abcdef", path);
-}
-
-//must be run through valgrind
-BOOST_AUTO_TEST_CASE(require_that_queued_requests_does_not_leak_memory) {
- const std::string spec("tcp/localhost:9111");
- std::shared_ptr<MockFileProvider> provider(new MockFileProvider());
- fd::FileDistributorRPC::SP fileDistributorRPC(new fd::FileDistributorRPC(spec, provider));
- fileDistributorRPC->start();
-
- FRT_Supervisor supervisor;
-
- supervisor.Start();
- FRT_Target *target = supervisor.GetTarget(spec.c_str());
-
- FRT_RPCRequest* request = supervisor.AllocRPCRequest();
- request->SetMethodName("waitFor");
- request->GetParams()->AddString(MockFileProvider::_queueForeverFileReference.c_str());
- target->InvokeVoid(request);
-
- provider->_queueForeverBarrier.wait(); //the request has been enqueued.
- fileDistributorRPC.reset();
-
- target->SubRef();
- supervisor.ShutDown(true);
-
-}
-
-BOOST_AUTO_TEST_CASE(require_that_port_can_be_extracted_from_connection_spec) {
- BOOST_CHECK_EQUAL(9056, fd::FileDistributorRPC::get_port("tcp/host:9056"));
- BOOST_CHECK_EQUAL(9056, fd::FileDistributorRPC::get_port("tcp/9056"));
- BOOST_CHECK_EQUAL(9056, fd::FileDistributorRPC::get_port("9056"));
-}
diff --git a/filedistribution/src/tests/scheduler/.gitignore b/filedistribution/src/tests/scheduler/.gitignore
deleted file mode 100644
index b1976d1c516..00000000000
--- a/filedistribution/src/tests/scheduler/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-filedistribution_scheduler_test_app
diff --git a/filedistribution/src/tests/scheduler/CMakeLists.txt b/filedistribution/src/tests/scheduler/CMakeLists.txt
deleted file mode 100644
index 763d87cfc64..00000000000
--- a/filedistribution/src/tests/scheduler/CMakeLists.txt
+++ /dev/null
@@ -1,16 +0,0 @@
-# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-vespa_add_executable(filedistribution_scheduler_test_app TEST
- SOURCES
- test-scheduler.cpp
- DEPENDS
- filedistribution_distributor
- filedistribution_filedistributionmodel
- filedistribution_common
-)
-target_compile_options(filedistribution_scheduler_test_app PRIVATE -DTORRENT_DISABLE_ENCRYPTION -DTORRENT_DISABLE_DHT -DWITH_SHIPPED_GEOIP_H -DBOOST_ASIO_HASH_MAP_BUCKETS=1021 -DBOOST_EXCEPTION_DISABLE -DBOOST_ASIO_ENABLE_CANCELIO -DBOOST_ASIO_DYN_LINK -DTORRENT_LINKING_SHARED)
-vespa_add_target_system_dependency(filedistribution_scheduler_test_app boost boost_system${VESPA_BOOST_LIB_SUFFIX})
-vespa_add_target_system_dependency(filedistribution_scheduler_test_app boost boost_thread${VESPA_BOOST_LIB_SUFFIX})
-vespa_add_target_system_dependency(filedistribution_scheduler_test_app boost boost_program_options${VESPA_BOOST_LIB_SUFFIX})
-vespa_add_target_system_dependency(filedistribution_scheduler_test_app boost boost_filesystem${VESPA_BOOST_LIB_SUFFIX})
-vespa_add_target_system_dependency(filedistribution_scheduler_test_app boost boost_unit_test_framework${VESPA_BOOST_LIB_SUFFIX})
-vespa_add_test(NAME filedistribution_scheduler_test_app NO_VALGRIND COMMAND filedistribution_scheduler_test_app)
diff --git a/filedistribution/src/tests/scheduler/test-scheduler.cpp b/filedistribution/src/tests/scheduler/test-scheduler.cpp
deleted file mode 100644
index d5afe7e4b11..00000000000
--- a/filedistribution/src/tests/scheduler/test-scheduler.cpp
+++ /dev/null
@@ -1,109 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#define BOOST_TEST_DYN_LINK
-#define BOOST_TEST_MAIN
-
-#include <boost/test/unit_test.hpp>
-
-#include <vespa/filedistribution/distributor/scheduler.h>
-
-#include <iostream>
-
-#include <boost/thread/barrier.hpp>
-
-using filedistribution::Scheduler;
-using namespace std::literals;
-
-namespace asio = boost::asio;
-
-class TestException {};
-
-
-struct CallRun {
- volatile bool _caughtException;
-
- CallRun()
- :_caughtException(false)
- {}
-
- void operator()(asio::io_service& ioService) {
- try {
- //No reset needed after handling exceptions.
- ioService.run();
- } catch(const TestException& e ) {
- _caughtException = true;
- }
- }
-};
-
-struct Fixture {
- CallRun callRun;
- Scheduler scheduler;
-
- Fixture()
- : scheduler(std::ref(callRun))
- {}
-};
-
-
-BOOST_FIXTURE_TEST_SUITE(SchedulerTest, Fixture)
-
-
-struct RepeatedTask : Scheduler::Task {
- void doHandle() override {
- std::cout <<"RepeatedTask::doHandle " <<std::endl;
- schedule(boost::posix_time::seconds(1));
- }
-
- RepeatedTask(Scheduler& scheduler) : Task(scheduler) {}
-};
-
-BOOST_AUTO_TEST_CASE(require_tasks_does_not_keep_scheduler_alive) {
- RepeatedTask::SP task(new RepeatedTask(scheduler));
- task->schedule(boost::posix_time::hours(10));
-}
-
-struct EnsureInvokedTask : Scheduler::Task {
- boost::barrier& _barrier;
-
- void doHandle() override {
- _barrier.wait();
- }
-
- EnsureInvokedTask(Scheduler& scheduler, boost::barrier& barrier) :
- Task(scheduler),
- _barrier(barrier)
- {}
-};
-
-
-BOOST_AUTO_TEST_CASE(require_task_invoked) {
- boost::barrier barrier(2);
-
- EnsureInvokedTask::SP task(new EnsureInvokedTask(scheduler, barrier));
- task->schedule(boost::posix_time::milliseconds(50));
-
- barrier.wait();
-}
-
-struct ThrowExceptionTask : Scheduler::Task {
- void doHandle() override {
- throw TestException();
- }
-
- ThrowExceptionTask(Scheduler& scheduler) :
- Task(scheduler)
- {}
-};
-
-BOOST_AUTO_TEST_CASE(require_exception_from_tasks_can_be_caught) {
- ThrowExceptionTask::SP task(new ThrowExceptionTask(scheduler));
- task->scheduleNow();
-
- for (int i=0; i<200 && !callRun._caughtException; ++i) {
- std::this_thread::sleep_for(100ms);
- }
-
- BOOST_CHECK(callRun._caughtException);
-}
-
-BOOST_AUTO_TEST_SUITE_END()
diff --git a/filedistribution/src/tests/status/.gitignore b/filedistribution/src/tests/status/.gitignore
deleted file mode 100644
index 3da528fcd45..00000000000
--- a/filedistribution/src/tests/status/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-filedistribution_status_test_app
diff --git a/filedistribution/src/tests/status/CMakeLists.txt b/filedistribution/src/tests/status/CMakeLists.txt
deleted file mode 100644
index 646cc1351a9..00000000000
--- a/filedistribution/src/tests/status/CMakeLists.txt
+++ /dev/null
@@ -1,15 +0,0 @@
-# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-vespa_add_executable(filedistribution_status_test_app TEST
- SOURCES
- test-status.cpp
- DEPENDS
- filedistribution_filedistributionmodel
- filedistribution_common
-)
-target_compile_options(filedistribution_status_test_app PRIVATE -DTORRENT_DISABLE_ENCRYPTION -DTORRENT_DISABLE_DHT -DWITH_SHIPPED_GEOIP_H -DBOOST_ASIO_HASH_MAP_BUCKETS=1021 -DBOOST_EXCEPTION_DISABLE -DBOOST_ASIO_ENABLE_CANCELIO -DBOOST_ASIO_DYN_LINK -DTORRENT_LINKING_SHARED)
-vespa_add_target_system_dependency(filedistribution_status_test_app boost boost_system${VESPA_BOOST_LIB_SUFFIX})
-vespa_add_target_system_dependency(filedistribution_status_test_app boost boost_thread${VESPA_BOOST_LIB_SUFFIX})
-vespa_add_target_system_dependency(filedistribution_status_test_app boost boost_program_options${VESPA_BOOST_LIB_SUFFIX})
-vespa_add_target_system_dependency(filedistribution_status_test_app boost boost_filesystem${VESPA_BOOST_LIB_SUFFIX})
-vespa_add_target_system_dependency(filedistribution_status_test_app boost boost_unit_test_framework${VESPA_BOOST_LIB_SUFFIX})
-vespa_add_test(NAME filedistribution_status_test_app NO_VALGRIND COMMAND filedistribution_status_test_app)
diff --git a/filedistribution/src/tests/status/test-status.cpp b/filedistribution/src/tests/status/test-status.cpp
deleted file mode 100644
index f50626aabdc..00000000000
--- a/filedistribution/src/tests/status/test-status.cpp
+++ /dev/null
@@ -1,16 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#define BOOST_TEST_DYN_LINK
-#define BOOST_TEST_MAIN
-#include <boost/test/unit_test.hpp>
-
-#include <vespa/filedistribution/model/zkfacade.h>
-#include <vespa/filedistribution/model/filedistributionmodel.h>
-#include <vespa/filedistribution/model/filedistributionmodelimpl.h>
-
-using namespace filedistribution;
-
-
-BOOST_AUTO_TEST_CASE(test_retrieve_status) {
- // TODO:
-}
-
diff --git a/filedistribution/src/tests/zkfacade/.gitignore b/filedistribution/src/tests/zkfacade/.gitignore
deleted file mode 100644
index 6ffef2339b1..00000000000
--- a/filedistribution/src/tests/zkfacade/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-filedistribution_zkfacade_test_app
diff --git a/filedistribution/src/tests/zkfacade/CMakeLists.txt b/filedistribution/src/tests/zkfacade/CMakeLists.txt
deleted file mode 100644
index 18dc8121049..00000000000
--- a/filedistribution/src/tests/zkfacade/CMakeLists.txt
+++ /dev/null
@@ -1,15 +0,0 @@
-# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-vespa_add_executable(filedistribution_zkfacade_test_app
- SOURCES
- test-zkfacade.cpp
- DEPENDS
- filedistribution_filedistributionmodel
- filedistribution_common
- filedistribution_mocks
-)
-target_compile_options(filedistribution_zkfacade_test_app PRIVATE -DTORRENT_DISABLE_ENCRYPTION -DTORRENT_DISABLE_DHT -DWITH_SHIPPED_GEOIP_H -DBOOST_ASIO_HASH_MAP_BUCKETS=1021 -DBOOST_EXCEPTION_DISABLE -DBOOST_ASIO_ENABLE_CANCELIO -DBOOST_ASIO_DYN_LINK -DTORRENT_LINKING_SHARED)
-vespa_add_target_system_dependency(filedistribution_zkfacade_test_app boost boost_system${VESPA_BOOST_LIB_SUFFIX})
-vespa_add_target_system_dependency(filedistribution_zkfacade_test_app boost boost_thread${VESPA_BOOST_LIB_SUFFIX})
-vespa_add_target_system_dependency(filedistribution_zkfacade_test_app boost boost_program_options${VESPA_BOOST_LIB_SUFFIX})
-vespa_add_target_system_dependency(filedistribution_zkfacade_test_app boost boost_filesystem${VESPA_BOOST_LIB_SUFFIX})
-vespa_add_target_system_dependency(filedistribution_zkfacade_test_app boost boost_unit_test_framework${VESPA_BOOST_LIB_SUFFIX})
diff --git a/filedistribution/src/tests/zkfacade/test-zkfacade.cpp b/filedistribution/src/tests/zkfacade/test-zkfacade.cpp
deleted file mode 100644
index de4be087432..00000000000
--- a/filedistribution/src/tests/zkfacade/test-zkfacade.cpp
+++ /dev/null
@@ -1,228 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#define BOOST_TEST_DYN_LINK
-#define BOOST_TEST_MAIN
-#define BOOST_TEST_MODULE zkfacade test
-
-#include <boost/test/unit_test.hpp>
-#include <iostream>
-#include <boost/thread/barrier.hpp>
-#include <vespa/filedistribution/common/componentsdeleter.h>
-#include <vespa/filedistribution/model/zkfacade.h>
-
-#include <zookeeper/zookeeper.h>
-
-
-using namespace std::literals;
-using namespace filedistribution;
-
-namespace {
-
-
-struct Watcher : public ZKFacade::NodeChangedWatcher {
- boost::barrier _barrier;
-
- Watcher() :
- _barrier(2) {}
-
- void operator()() override {
- _barrier.wait();
- }
-};
-
-struct Fixture {
- ComponentsDeleter _componentsDeleter;
- std::shared_ptr<ZKFacade> zk;
- Path testNode;
-
- Fixture() {
- zoo_set_debug_level(ZOO_LOG_LEVEL_WARN);
- zk = _componentsDeleter.track(new ZKFacade("test1-tonyv:2181", false));
-
- testNode = "/test-node";
- zk->removeIfExists(testNode);
- }
-
- ~Fixture() {
- if (zk) {
- zk->removeIfExists(testNode);
- }
- }
-};
-
-} //anonymous namespace
-
-
-BOOST_FIXTURE_TEST_SUITE(ZKFacadeTests, Fixture)
-
-BOOST_AUTO_TEST_CASE(hasNode)
-{
- zk->setData(testNode, "", 0);
- BOOST_CHECK(zk->hasNode(testNode));
-
- zk->remove(testNode);
- BOOST_CHECK(!zk->hasNode(testNode));
-}
-
-BOOST_AUTO_TEST_CASE(getValidZKServers)
-{
- BOOST_CHECK_EQUAL("localhost:22", ZKFacade::getValidZKServers("localhost:22", false));
- BOOST_CHECK_EQUAL("localhost:22", ZKFacade::getValidZKServers("localhost:22", true));
- BOOST_CHECK_EQUAL("idonotexist:22", ZKFacade::getValidZKServers("idonotexist:22", false));
- BOOST_CHECK_EQUAL("", ZKFacade::getValidZKServers("idonotexist:22", true));
- BOOST_CHECK_EQUAL("localhost:22,idonotexist:22", ZKFacade::getValidZKServers("localhost:22,idonotexist:22", false));
- BOOST_CHECK_EQUAL("localhost:22", ZKFacade::getValidZKServers("localhost:22,idonotexist:22", true));
- BOOST_CHECK_EQUAL("idonotexist:22,localhost:22", ZKFacade::getValidZKServers("idonotexist:22,localhost:22", false));
- BOOST_CHECK_EQUAL("localhost:22", ZKFacade::getValidZKServers("idonotexist:22,localhost:22", true));
-}
-
-BOOST_AUTO_TEST_CASE(hasNodeNotification)
-{
- std::shared_ptr<Watcher> watcher(new Watcher);
-
- zk->hasNode(testNode, watcher);
- zk->setData(testNode, "", 0);
- watcher->_barrier.wait();
-
- //after the notification has returned, the watcher must no longer reside in watchers map.
- for (int i=0; i<20 && !watcher.unique(); ++i) {
- std::this_thread::sleep_for(100ms);
- }
- BOOST_CHECK(watcher.unique());
-}
-
-BOOST_AUTO_TEST_CASE(getAndSetData)
-{
- std::string inputString = "test data.";
- Buffer inputBuffer(inputString.begin(), inputString.end());
-
- zk->setData(testNode, inputBuffer);
-
- Buffer outputBuffer(zk->getData(testNode));
- std::string outputString(outputBuffer.begin(), outputBuffer.end());
-
- BOOST_CHECK(outputString == inputString);
-
- outputString = zk->getString(testNode);
- BOOST_CHECK(outputString == inputString);
-}
-
-BOOST_AUTO_TEST_CASE(setDataMustExist)
-{
- bool mustExist = true;
- BOOST_REQUIRE_THROW(zk->setData(testNode, "", 0, mustExist), ZKNodeDoesNotExistsException);
-}
-
-BOOST_AUTO_TEST_CASE(createSequenceNode)
-{
- zk->setData(testNode, "", 0);
-
- Path prefix = testNode / "prefix";
- zk->createSequenceNode(prefix, "test", 4);
- zk->createSequenceNode(prefix, "test", 4);
- zk->createSequenceNode(prefix, "test", 4);
-
- std::vector<std::string> children = zk->getChildren(testNode);
- BOOST_CHECK(children.size() == 3);
- BOOST_CHECK(children.begin()->substr(0,6) == "prefix");
-
- Buffer buffer(zk->getData(testNode / *children.begin()));
- std::string bufferContent(buffer.begin(), buffer.end());
-
- BOOST_CHECK(bufferContent == "test");
-}
-
-BOOST_AUTO_TEST_CASE(retainOnly)
-{
- zk->setData(testNode, "", 0);
-
- zk->setData(testNode / "a", "", 0);
- zk->setData(testNode / "b", "", 0);
- zk->setData(testNode / "c", "", 0);
- zk->setData(testNode / "d", "", 0);
-
- std::vector<std::string> toRetain;
- toRetain.push_back("a");
- toRetain.push_back("c");
-
- zk->retainOnly(testNode, toRetain);
- std::vector<std::string> children = zk->getChildren(testNode);
-
- std::sort(children.begin(), children.end());
- BOOST_CHECK(children == toRetain);
-}
-
-
-
-BOOST_AUTO_TEST_CASE(addEphemeralNode)
-{
- Path ephemeralNode = "/test-ephemeral-node";
- zk->removeIfExists(ephemeralNode);
-
- //Checked deleter is ok here since we're not installing any watchers
- ZKFacade::SP zk2(new ZKFacade("test1-tonyv:2181", false), boost::checked_deleter<ZKFacade>());
- zk2->addEphemeralNode(ephemeralNode);
-
- BOOST_CHECK(zk->hasNode(ephemeralNode));
- zk2.reset();
- BOOST_CHECK(!zk->hasNode(ephemeralNode));
-}
-
-
-
-BOOST_AUTO_TEST_CASE(dataChangedNotification)
-{
- std::shared_ptr<Watcher> watcher(new Watcher);
-
- zk->setData(testNode, "", 0);
- Buffer buffer(zk->getData(testNode, watcher));
- BOOST_CHECK(buffer.size() == 0);
-
- bool mustExist = true;
- zk->setData(testNode, "test", 4, mustExist);
- watcher->_barrier.wait();
-}
-
-BOOST_AUTO_TEST_CASE(getChildrenNotification)
-{
- std::shared_ptr<Watcher> watcher(new Watcher);
-
- zk->setData(testNode, "", 0);
- zk->getChildren(testNode, watcher);
-
- zk->setData(testNode / "child", "", 0);
- watcher->_barrier.wait();
-}
-
-BOOST_AUTO_TEST_CASE(require_that_zkfacade_can_be_deleted_from_callback)
-{
- struct DeleteZKFacadeWatcher : public Watcher {
- std::shared_ptr<ZKFacade> _zk;
-
- DeleteZKFacadeWatcher(const std::shared_ptr<ZKFacade>& zk)
- :_zk(zk)
- {}
-
- void operator()() override {
- BOOST_CHECK(_zk.use_count() == 2);
- _zk.reset();
- Watcher::operator()();
- }
- };
-
- std::shared_ptr<Watcher> watcher((Watcher*)new DeleteZKFacadeWatcher(zk));
-
- zk->setData(testNode, "", 0);
- zk->getData(testNode, watcher);
-
- ZKFacade* unprotectedZk = zk.get();
- zk.reset();
-
- unprotectedZk->setData(testNode, "t", 1);
- watcher->_barrier.wait();
-
- //Must wait longer than the zookeeper_close timeout to catch
- //problems due to closing zookeeper in a zookeeper watcher thread.
- sleep(3);
-}
-
-BOOST_AUTO_TEST_SUITE_END()
diff --git a/filedistribution/src/tests/zkfiledbmodel/.gitignore b/filedistribution/src/tests/zkfiledbmodel/.gitignore
deleted file mode 100644
index 18c14c47c56..00000000000
--- a/filedistribution/src/tests/zkfiledbmodel/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-filedistribution_zkfiledbmodel_test_app
diff --git a/filedistribution/src/tests/zkfiledbmodel/CMakeLists.txt b/filedistribution/src/tests/zkfiledbmodel/CMakeLists.txt
deleted file mode 100644
index e21d0bacf6e..00000000000
--- a/filedistribution/src/tests/zkfiledbmodel/CMakeLists.txt
+++ /dev/null
@@ -1,16 +0,0 @@
-# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-vespa_add_executable(filedistribution_zkfiledbmodel_test_app TEST
- SOURCES
- test-zkfiledbmodel.cpp
- DEPENDS
- filedistribution_filedistributionmodel
- filedistribution_common
- filedistribution_mocks
-)
-target_compile_options(filedistribution_zkfiledbmodel_test_app PRIVATE -DTORRENT_DISABLE_ENCRYPTION -DTORRENT_DISABLE_DHT -DWITH_SHIPPED_GEOIP_H -DBOOST_ASIO_HASH_MAP_BUCKETS=1021 -DBOOST_EXCEPTION_DISABLE -DBOOST_ASIO_ENABLE_CANCELIO -DBOOST_ASIO_DYN_LINK -DTORRENT_LINKING_SHARED)
-vespa_add_target_system_dependency(filedistribution_zkfiledbmodel_test_app boost boost_system${VESPA_BOOST_LIB_SUFFIX})
-vespa_add_target_system_dependency(filedistribution_zkfiledbmodel_test_app boost boost_thread${VESPA_BOOST_LIB_SUFFIX})
-vespa_add_target_system_dependency(filedistribution_zkfiledbmodel_test_app boost boost_program_options${VESPA_BOOST_LIB_SUFFIX})
-vespa_add_target_system_dependency(filedistribution_zkfiledbmodel_test_app boost boost_filesystem${VESPA_BOOST_LIB_SUFFIX})
-vespa_add_target_system_dependency(filedistribution_zkfiledbmodel_test_app boost boost_unit_test_framework${VESPA_BOOST_LIB_SUFFIX})
-vespa_add_test(NAME filedistribution_zkfiledbmodel_test_app NO_VALGRIND COMMAND filedistribution_zkfiledbmodel_test_app)
diff --git a/filedistribution/src/tests/zkfiledbmodel/test-zkfiledbmodel.cpp b/filedistribution/src/tests/zkfiledbmodel/test-zkfiledbmodel.cpp
deleted file mode 100644
index 20fc4364dc8..00000000000
--- a/filedistribution/src/tests/zkfiledbmodel/test-zkfiledbmodel.cpp
+++ /dev/null
@@ -1,88 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#define BOOST_TEST_DYN_LINK
-#define BOOST_TEST_MAIN
-#define BOOST_TEST_MODULE zkfiledbmodel test
-#include <boost/test/unit_test.hpp>
-
-#include <iostream>
-
-#include <vespa/filedistribution/common/componentsdeleter.h>
-#include <vespa/filedistribution/model/zkfacade.h>
-#include <vespa/filedistribution/model/zkfiledbmodel.h>
-
-#include <zookeeper/zookeeper.h>
-
-
-using namespace filedistribution;
-
-namespace {
-
-struct Fixture {
- ComponentsDeleter _componentsDeleter;
- std::shared_ptr<ZKFacade> zk;
- std::shared_ptr<ZKFileDBModel> model;
-
- Fixture() {
- zoo_set_debug_level(ZOO_LOG_LEVEL_WARN);
- zk = _componentsDeleter.track(new ZKFacade("test1-tonyv:2181", false));
- zk->setData("/vespa", "", 0);
-
- model = _componentsDeleter.track(new ZKFileDBModel(zk));
- }
-};
-
-} //anonymous namespace
-
-
-BOOST_FIXTURE_TEST_SUITE(ZKFileDBModelTests, Fixture)
-
-BOOST_AUTO_TEST_CASE(retainOnlyHostsForTenant)
-{
- Path path = "/vespa/filedistribution/hosts";
- std::vector<std::string> files = {"myfile"};
- BOOST_CHECK(zk->hasNode("/vespa"));
- BOOST_CHECK(zk->hasNode("/vespa/filedistribution"));
- BOOST_CHECK(zk->hasNode(path));
- model->setDeployedFilesToDownload("testhost", "myapp:so:cool", files);
- model->setDeployedFilesToDownload("testhost2", "myapp:so:cool", files);
- model->setDeployedFilesToDownload("testhost3", "myapp:so:cool", files);
- model->setDeployedFilesToDownload("testhost3", "myapp:legacyid:so:cool", files);
- model->setDeployedFilesToDownload("testhost3", "yourapp:so:cool", files);
- BOOST_CHECK(zk->getChildren(path / "testhost").size() == 1);
- BOOST_CHECK(zk->getChildren(path / "testhost2").size() == 1);
- BOOST_CHECK(zk->getChildren(path / "testhost3").size() == 3);
-
- model->cleanDeployedFilesToDownload({"testhost3"}, "yourapp:so:cool");
- model->removeDeploymentsThatHaveDifferentApplicationId({"testhost3"}, "yourapp:so:cool");
- BOOST_CHECK(zk->hasNode(path / "testhost"));
- BOOST_CHECK(zk->hasNode(path / "testhost2"));
- BOOST_CHECK(zk->hasNode(path / "testhost3"));
- BOOST_CHECK(zk->getChildren(path / "testhost").size() == 1);
- BOOST_CHECK(zk->getChildren(path / "testhost2").size() == 1);
- BOOST_CHECK(zk->getChildren(path / "testhost3").size() == 1);
-
- model->cleanDeployedFilesToDownload({"testhost"}, "myapp:not:cool");
- model->removeDeploymentsThatHaveDifferentApplicationId({"testhost"}, "myapp:not:cool");
- BOOST_CHECK(zk->hasNode(path / "testhost"));
- BOOST_CHECK(zk->hasNode(path / "testhost2"));
- BOOST_CHECK(zk->hasNode(path / "testhost3"));
- BOOST_CHECK(zk->getChildren(path / "testhost").size() == 0);
- BOOST_CHECK(zk->getChildren(path / "testhost2").size() == 1);
- BOOST_CHECK(zk->getChildren(path / "testhost3").size() == 1);
-
- model->cleanDeployedFilesToDownload({"testhost2"}, "myapp:so:cool");
- model->removeDeploymentsThatHaveDifferentApplicationId({"testhost2"}, "myapp:so:cool");
-
- BOOST_CHECK(!zk->hasNode(path / "testhost"));
- BOOST_CHECK(zk->hasNode(path / "testhost2"));
- BOOST_CHECK(zk->hasNode(path / "testhost3"));
- BOOST_CHECK(zk->getChildren(path / "testhost2").size() == 1);
- BOOST_CHECK(zk->getChildren(path / "testhost3").size() == 1);
-
- model->cleanDeployedFilesToDownload({"testhost2"}, "yourapp:so:cool");
- BOOST_CHECK(!zk->hasNode(path / "testhost"));
- BOOST_CHECK(zk->hasNode(path / "testhost2"));
- BOOST_CHECK(!zk->hasNode(path / "testhost3"));
-}
-
-BOOST_AUTO_TEST_SUITE_END()
diff --git a/filedistribution/src/vespa/.gitignore b/filedistribution/src/vespa/.gitignore
deleted file mode 100644
index a477b4a6e5f..00000000000
--- a/filedistribution/src/vespa/.gitignore
+++ /dev/null
@@ -1,3 +0,0 @@
-Makefile
-config-*.h
-config-*.cpp
diff --git a/filedistribution/src/vespa/filedistribution/common/CMakeLists.txt b/filedistribution/src/vespa/filedistribution/common/CMakeLists.txt
deleted file mode 100644
index 129b084b400..00000000000
--- a/filedistribution/src/vespa/filedistribution/common/CMakeLists.txt
+++ /dev/null
@@ -1,15 +0,0 @@
-# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-vespa_add_library(filedistribution_exceptionrethrower OBJECT
- SOURCES
- exception.cpp
- DEPENDS
-)
-vespa_add_library(filedistribution_common STATIC
- SOURCES
- componentsdeleter.cpp
- exception.cpp
- vespa_logfwd.cpp
- DEPENDS
-)
-target_compile_options(filedistribution_common PRIVATE -DTORRENT_DISABLE_ENCRYPTION -DTORRENT_DISABLE_DHT -DWITH_SHIPPED_GEOIP_H -DBOOST_ASIO_HASH_MAP_BUCKETS=1021 -DBOOST_EXCEPTION_DISABLE -DBOOST_ASIO_ENABLE_CANCELIO -DBOOST_ASIO_DYN_LINK -DTORRENT_LINKING_SHARED)
-vespa_add_target_system_dependency(filedistribution_common boost boost_thread${VESPA_BOOST_LIB_SUFFIX})
diff --git a/filedistribution/src/vespa/filedistribution/common/buffer.h b/filedistribution/src/vespa/filedistribution/common/buffer.h
deleted file mode 100644
index 258f43be90c..00000000000
--- a/filedistribution/src/vespa/filedistribution/common/buffer.h
+++ /dev/null
@@ -1,93 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#pragma once
-
-#include <algorithm>
-
-namespace filedistribution {
-
-class Buffer {
- size_t _capacity;
- char* _buf;
- size_t _size;
-public:
- typedef char value_type;
- typedef const value_type& const_reference;
- typedef char* iterator;
- typedef const char* const_iterator;
-
- Buffer(const Buffer &) = delete;
- Buffer & operator = (const Buffer &) = delete;
- explicit Buffer(size_t capacityArg)
- :_capacity(capacityArg),
- _buf( new char[_capacity] ),
- _size(0)
- {}
-
- Buffer(Buffer && rhs) :
- _capacity(rhs._capacity),
- _buf(rhs._buf),
- _size(rhs._size)
- {
- rhs._capacity = 0;
- rhs._size = 0;
- rhs._buf = nullptr;
- }
-
- template <typename ITER>
- Buffer(ITER beginIter, ITER endIter)
- : _capacity(endIter-beginIter),
- _buf( new char[_capacity] ),
- _size(_capacity)
- {
- std::copy(beginIter, endIter, begin());
- }
-
- ~Buffer() {
- delete[] _buf;
- }
-
- size_t capacity() const { return _capacity; }
- size_t size() const { return _size; }
-
- //might expose uninitialized memory
- void resize(size_t newSize) {
- if ( newSize <= _capacity )
- _size = newSize;
- else {
- reserve(newSize);
- _size = newSize;
- }
- }
-
- void reserve(size_t newCapacity) {
- if ( newCapacity > _capacity ) {
- Buffer buffer(newCapacity);
- buffer._size = _size;
- std::copy(begin(), end(), buffer.begin());
- buffer.swap(*this);
- }
- }
-
- void swap(Buffer& other) {
- std::swap(_capacity, other._capacity);
- std::swap(_buf, other._buf);
- std::swap(_size, other._size);
- }
-
- void push_back(char c) {
- if (_size == _capacity)
- reserve(_capacity * 2);
- _buf[_size++] = c;
- }
-
- iterator begin() { return _buf; }
- iterator end() { return _buf + _size; }
- const_iterator begin() const { return _buf; }
- const_iterator end() const { return _buf + _size; }
- char operator[](size_t i) const { return _buf[i]; }
- char& operator[](size_t i) { return _buf[i]; }
-};
-
-} //namespace filedistribution
-
-
diff --git a/filedistribution/src/vespa/filedistribution/common/componentsdeleter.cpp b/filedistribution/src/vespa/filedistribution/common/componentsdeleter.cpp
deleted file mode 100644
index 1dc0975a925..00000000000
--- a/filedistribution/src/vespa/filedistribution/common/componentsdeleter.cpp
+++ /dev/null
@@ -1,81 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include "componentsdeleter.h"
-#include <cassert>
-
-#include <vespa/log/log.h>
-LOG_SETUP(".componentsdeleter");
-
-using namespace std::literals;
-using namespace filedistribution;
-
-struct ComponentsDeleter::Worker {
- ComponentsDeleter& _parent;
-
- Worker(ComponentsDeleter* parent)
- :_parent(*parent) {}
-
- void operator()();
-};
-
-
-void
-ComponentsDeleter::Worker::operator()()
-{
- while ( ! _parent.areWeDone() ) {
- CallDeleteFun deleteFun = _parent._deleteRequests.pop();
- deleteFun();
- }
-}
-
-ComponentsDeleter::ComponentsDeleter() :
- _closed(false),
- _deleterThread(Worker(this))
-{}
-
-ComponentsDeleter::~ComponentsDeleter()
-{
- close();
- waitForAllComponentsDeleted();
- _deleterThread.join();
-}
-
-void
-ComponentsDeleter::waitForAllComponentsDeleted()
-{
- LOG(debug, "Waiting for all components to be deleted");
-
- for (int i=0; i<600 && !areWeDone(); ++i) {
- std::this_thread::sleep_for(100ms);
- }
- LOG(debug, "Done waiting for all components to be deleted");
- assert(_trackedComponents.empty());
- assert(_deleteRequests.empty());
-}
-
-void
-ComponentsDeleter::close()
-{
- {
- LockGuard guard(_trackedComponentsMutex);
- _closed = true;
- }
- _deleteRequests.push([]() { LOG(debug, "I am the last one, hurry up and shutdown"); });
-}
-
-bool
-ComponentsDeleter::areWeDone()
-{
- LockGuard guard(_trackedComponentsMutex);
- return _closed && _trackedComponents.empty() && _deleteRequests.empty();
-}
-
-void
-ComponentsDeleter::removeFromTrackedComponents(void* component) {
- LockGuard guard(_trackedComponentsMutex);
- if (_trackedComponents.count(component))
- LOG(debug, "Deleting '%s'", _trackedComponents[component].c_str());
-
- size_t numErased = _trackedComponents.erase(component);
- assert(numErased == 1);
- (void) numErased;
-}
diff --git a/filedistribution/src/vespa/filedistribution/common/componentsdeleter.h b/filedistribution/src/vespa/filedistribution/common/componentsdeleter.h
deleted file mode 100644
index 65860d3ad66..00000000000
--- a/filedistribution/src/vespa/filedistribution/common/componentsdeleter.h
+++ /dev/null
@@ -1,73 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#pragma once
-
-#include "concurrentqueue.h"
-
-#include <map>
-#include <typeinfo>
-#include <string>
-#include <mutex>
-#include <thread>
-#include <functional>
-
-
-namespace filedistribution {
-/**
- * Ensures that components are deleted in a separate thread,
- * and that their lifetime is tracked.
- *
- * This prevents situations as e.g. deleting ZKFacade from a zookeeper watcher thread.
- */
-class ComponentsDeleter {
- class Worker;
- typedef std::lock_guard<std::mutex> LockGuard;
-
- std::mutex _trackedComponentsMutex;
- typedef std::map<void*, std::string> TrackedComponentsMap;
- TrackedComponentsMap _trackedComponents;
-
- typedef std::function<void (void)> CallDeleteFun;
- ConcurrentQueue<CallDeleteFun> _deleteRequests;
- bool _closed;
- std::thread _deleterThread;
-
- void removeFromTrackedComponents(void* component);
-
- template <class T>
- void deleteComponent(T* component) {
- removeFromTrackedComponents(component);
- delete component;
- }
-
- template <class T>
- void requestDelete(T* component) {
- _deleteRequests.push([this, component]() { deleteComponent<T>(component); });
- }
-
- void waitForAllComponentsDeleted();
- bool areWeDone();
- void close();
- public:
- ComponentsDeleter(const ComponentsDeleter &) = delete;
- ComponentsDeleter & operator = (const ComponentsDeleter &) = delete;
- ComponentsDeleter();
-
- /*
- * Waits blocking for up to 60 seconds until all components are deleted.
- * If it fails, the application is killed.
- */
- ~ComponentsDeleter();
-
- template <class T>
- std::shared_ptr<T> track(T* t) {
- LockGuard guard(_trackedComponentsMutex);
- if (_closed) {
- return std::shared_ptr<T>(t);
- }
-
- _trackedComponents[t] = typeid(t).name();
- return std::shared_ptr<T>(t, [this](T * p) { requestDelete<T>(p); });
- }
-};
-}
-
diff --git a/filedistribution/src/vespa/filedistribution/common/concurrentqueue.h b/filedistribution/src/vespa/filedistribution/common/concurrentqueue.h
deleted file mode 100644
index db23b56c500..00000000000
--- a/filedistribution/src/vespa/filedistribution/common/concurrentqueue.h
+++ /dev/null
@@ -1,56 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#pragma once
-
-#include <queue>
-
-#include <mutex>
-#include <condition_variable>
-
-namespace filedistribution {
-
-template <typename T>
-class ConcurrentQueue {
-public:
- typedef T value_type;
-private:
- std::condition_variable _nonEmpty;
-
- mutable std::mutex _queueMutex;
- typedef std::unique_lock<std::mutex> UniqueLock;
-
- std::queue<value_type> _queue;
-
-public:
- void push(const T& t) {
- {
- UniqueLock guard(_queueMutex);
- _queue.push(t);
- }
- _nonEmpty.notify_one();
- }
-
- //Assumes that value_type has nonthrow copy constructors
- const T pop() {
- UniqueLock guard(_queueMutex);
- while (_queue.empty()) {
- _nonEmpty.wait(guard);
- }
- T result = _queue.front();
- _queue.pop();
- return result;
- }
-
- void clear() {
- UniqueLock guard(_queueMutex);
- while (!_queue.empty()) {
- _queue.pop();
- }
- }
- bool empty() {
- UniqueLock guard(_queueMutex);
- return _queue.empty();
- }
-};
-
-} //namespace filedistribution
-
diff --git a/filedistribution/src/vespa/filedistribution/common/exception.cpp b/filedistribution/src/vespa/filedistribution/common/exception.cpp
deleted file mode 100644
index 499cbcea9b9..00000000000
--- a/filedistribution/src/vespa/filedistribution/common/exception.cpp
+++ /dev/null
@@ -1,8 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include "exception.h"
-
-namespace filedistribution {
-
-VESPA_IMPLEMENT_EXCEPTION(FileDoesNotExistException, vespalib::Exception);
-
-}
diff --git a/filedistribution/src/vespa/filedistribution/common/exception.h b/filedistribution/src/vespa/filedistribution/common/exception.h
deleted file mode 100644
index bb47aa3c779..00000000000
--- a/filedistribution/src/vespa/filedistribution/common/exception.h
+++ /dev/null
@@ -1,13 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#pragma once
-
-#include <vespa/vespalib/util/exceptions.h>
-#include <boost/filesystem/path.hpp>
-
-namespace filedistribution {
-
-using Path = boost::filesystem::path;
-
-VESPA_DEFINE_EXCEPTION(FileDoesNotExistException, vespalib::Exception);
-
-}
diff --git a/filedistribution/src/vespa/filedistribution/common/logfwd.h b/filedistribution/src/vespa/filedistribution/common/logfwd.h
deleted file mode 100644
index 2732b9713fa..00000000000
--- a/filedistribution/src/vespa/filedistribution/common/logfwd.h
+++ /dev/null
@@ -1,22 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#pragma once
-
-#include <cstdarg>
-
-namespace filedistribution {
-
-/** To avoid requiring vespa log from the jni library*/
-namespace logfwd {
-
-enum LogLevel { debug, error, warning, info };
-
-void log_forward(LogLevel level, const char *file, int line, const char *fmt, ...)
- __attribute__((format(printf,4,5)));
-
-} //namespace logfwd
-} //namespace filedistribution
-
-#define LOGFWD(level, ...) filedistribution::logfwd::log_forward(filedistribution::logfwd::level, \
- __FILE__, __LINE__, __VA_ARGS__)
-
-
diff --git a/filedistribution/src/vespa/filedistribution/common/vespa_logfwd.cpp b/filedistribution/src/vespa/filedistribution/common/vespa_logfwd.cpp
deleted file mode 100644
index 0da06ded32d..00000000000
--- a/filedistribution/src/vespa/filedistribution/common/vespa_logfwd.cpp
+++ /dev/null
@@ -1,44 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include "logfwd.h"
-#include <vector>
-#include <vespa/log/log.h>
-
-LOG_SETUP(".common.model");
-
-
-using ns_log::Logger;
-
-namespace {
- Logger::LogLevel toVespaLogLevel(filedistribution::logfwd::LogLevel level) {
- namespace l = filedistribution::logfwd;
-
- switch (level) {
- case l::info: return Logger::info;
- case l::debug: return Logger::debug;
- case l::error: return Logger::error;
- case l::warning: return Logger::warning;
- default:
- LOG(error, "Unknown log level, falling back to error");
- return Logger::error;
- }
- }
-}
-
-void filedistribution::logfwd::log_forward(LogLevel level, const char* file, int line, const char* fmt, ...)
-{
-
- Logger::LogLevel vespaLogLevel = toVespaLogLevel(level);
-
- if (logger.wants(vespaLogLevel)) {
- const size_t maxSize(0x8000);
- std::vector<char> payload(maxSize);
- char * buf = &payload[0];
-
- va_list args;
- va_start(args, fmt);
- vsnprintf(buf, maxSize, fmt, args);
- va_end(args);
-
- logger.doLog(vespaLogLevel, file, line, "%s", buf);
- }
-}
diff --git a/filedistribution/src/vespa/filedistribution/distributor/CMakeLists.txt b/filedistribution/src/vespa/filedistribution/distributor/CMakeLists.txt
deleted file mode 100644
index 6b575dc5526..00000000000
--- a/filedistribution/src/vespa/filedistribution/distributor/CMakeLists.txt
+++ /dev/null
@@ -1,13 +0,0 @@
-# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-vespa_add_library(filedistribution_distributor STATIC
- SOURCES
- filedistributortrackerimpl.cpp
- filedownloader.cpp
- filedownloadermanager.cpp
- hostname.cpp
- scheduler.cpp
- signalhandling.cpp
- state_server_impl.cpp
- DEPENDS
-)
-target_compile_options(filedistribution_distributor PRIVATE -DTORRENT_DISABLE_ENCRYPTION -DTORRENT_DISABLE_DHT -DWITH_SHIPPED_GEOIP_H -DBOOST_ASIO_HASH_MAP_BUCKETS=1021 -DBOOST_EXCEPTION_DISABLE -DBOOST_ASIO_ENABLE_CANCELIO -DBOOST_ASIO_DYN_LINK -DTORRENT_LINKING_SHARED)
diff --git a/filedistribution/src/vespa/filedistribution/distributor/filedistributortrackerimpl.cpp b/filedistribution/src/vespa/filedistribution/distributor/filedistributortrackerimpl.cpp
deleted file mode 100644
index 185a620b7f3..00000000000
--- a/filedistribution/src/vespa/filedistribution/distributor/filedistributortrackerimpl.cpp
+++ /dev/null
@@ -1,202 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "filedistributortrackerimpl.h"
-#include "filedownloader.h"
-#include <vespa/filedistribution/model/zkfacade.h>
-
-#include <vespa/log/log.h>
-LOG_SETUP(".filedistributiontrackerimpl");
-
-using namespace filedistribution;
-
-typedef FileDistributionModel::PeerEntries PeerEntries;
-
-namespace asio = boost::asio;
-
-namespace {
-
-void
-filterSelf(FileDistributionModel::PeerEntries& peers,
- const std::string& hostName,
- int port)
-{
- FileDistributionModel::PeerEntries::iterator
- i = peers.begin(),
- currEnd = peers.end();
-
- while ( i != currEnd ) {
- //hostName is currently used in the ip field
- if (i->ip == hostName && i->port == port) {
- --currEnd;
- std::swap(*i, *currEnd);
- } else {
- ++i;
- }
- }
-
- peers.erase(currEnd, peers.end());
-}
-
-void resolveIPAddresses(PeerEntries& peers) {
- for (auto& p: peers) {
- try {
- p.ip = filedistribution::lookupIPAddress(p.ip);
- } catch (filedistribution::FailedResolvingHostName& e) {
- LOG(info, "Failed resolving address %s", p.ip.c_str());
- }
- }
-}
-
-struct TrackingTask : public Scheduler::Task {
- int _numTimesRescheduled;
-
- libtorrent::tracker_request _trackerRequest;
- boost::weak_ptr<libtorrent::torrent> _torrent;
- std::weak_ptr<FileDownloader> _downloader;
- std::shared_ptr<FileDistributionModel> _model;
-
- TrackingTask(Scheduler& scheduler,
- const libtorrent::tracker_request& trackerRequest,
- const TorrentSP & torrent,
- const std::weak_ptr<FileDownloader>& downloader,
- const std::shared_ptr<FileDistributionModel>& model);
- ~TrackingTask();
-
- //TODO: refactor
- void doHandle() override;
- PeerEntries getPeers(const std::shared_ptr<FileDownloader>& downloader);
- void reschedule();
-};
-
-TrackingTask::TrackingTask(Scheduler& scheduler,
- const libtorrent::tracker_request& trackerRequest,
- const TorrentSP & torrent,
- const std::weak_ptr<FileDownloader>& downloader,
- const std::shared_ptr<FileDistributionModel>& model)
- : Task(scheduler),
- _numTimesRescheduled(0),
- _trackerRequest(trackerRequest),
- _torrent(torrent),
- _downloader(downloader),
- _model(model)
-{ }
-
-TrackingTask::~TrackingTask() {}
-
-#pragma GCC diagnostic push
-#pragma GCC diagnostic ignored "-Winline"
-//TODO: refactor
-void
-TrackingTask::doHandle() {
- if (std::shared_ptr<FileDownloader> downloader = _downloader.lock()) {
- //All torrents must be destructed before the session is destructed.
- //It's okay to prevent the torrent from expiring here
- //since the session can't be destructed while
- //we hold a shared_ptr to the downloader.
- if (TorrentSP torrent = _torrent.lock()) {
- PeerEntries peers = getPeers(downloader);
-
- if (!peers.empty()) {
- torrent->session().m_io_service.dispatch(
- [torrent_weak_ptr = _torrent, trackerRequest = _trackerRequest, peers = peers]() mutable {
- if (auto torrent_sp = torrent_weak_ptr.lock()) {
- torrent_sp->tracker_response(
- trackerRequest,
- libtorrent::address(),
- std::list<libtorrent::address>(),
- peers,
- -1, -1, -1, -1, -1,
- libtorrent::address(), "trackerid");
- }
- });
- }
-
- if (peers.size() < 5) {
- reschedule();
- }
- }
- }
-}
-#pragma GCC diagnostic pop
-
-PeerEntries
-TrackingTask::getPeers(const std::shared_ptr<FileDownloader>& downloader) {
- std::string fileReference = downloader->infoHash2FileReference(_trackerRequest.info_hash);
-
- const size_t recommendedMaxNumberOfPeers = 30;
- PeerEntries peers = _model->getPeers(fileReference, recommendedMaxNumberOfPeers);
-
- //currently, libtorrent stops working if it tries to connect to itself.
- filterSelf(peers, downloader->_hostName, downloader->_port);
- resolveIPAddresses(peers);
- for (const auto& peer: peers) {
- LOG(debug, "Returning peer with ip %s", peer.ip.c_str());
- }
-
- return peers;
-}
-
-void
-TrackingTask::reschedule() {
- if (_numTimesRescheduled < 5) {
- double fudgeFactor = 0.1;
- schedule(boost::posix_time::seconds(static_cast<int>(
- std::pow(3., _numTimesRescheduled) + fudgeFactor)));
- _numTimesRescheduled++;
- }
-}
-
-} //anonymous namespace
-
-FileDistributorTrackerImpl::FileDistributorTrackerImpl(const std::shared_ptr<FileDistributionModel>& model) :
- _model(model)
-{}
-
-FileDistributorTrackerImpl::~FileDistributorTrackerImpl() {
- LOG(debug, "Deconstructing FileDistributorTrackerImpl");
-
- LockGuard guard(_mutex);
- _scheduler.reset();
-}
-
-void
-FileDistributorTrackerImpl::trackingRequest(
- libtorrent::tracker_request& request,
- const TorrentSP & torrent)
-{
- LockGuard guard(_mutex);
-
- if (torrent != TorrentSP()) {
- std::shared_ptr<TrackingTask> trackingTask(new TrackingTask(
- *_scheduler.get(), request, torrent, _downloader, _model));
-
- trackingTask->scheduleNow();
- }
-}
-
-void asioWorker(asio::io_service& ioService)
-{
- while (!ioService.stopped()) {
- try {
- ioService.run();
- } catch (const ZKConnectionLossException & e) {
- LOG(info, "Connection loss in asioWorker thread, resuming. %s", e.what());
- } catch (const ZKOperationTimeoutException & e) {
- LOG(warning, "Operation timed out in asioWorker thread, will do quick exit to start a clean sheet. %s", e.what());
- std::quick_exit(31);
- }
- }
-}
-
-void
-FileDistributorTrackerImpl::setDownloader(const std::shared_ptr<FileDownloader>& downloader)
-{
- LockGuard guard(_mutex);
-
- _scheduler.reset();
- _downloader = downloader;
-
- if (downloader) {
- _scheduler.reset(new Scheduler([] (asio::io_service& ioService) { asioWorker(ioService); }));
- }
-}
diff --git a/filedistribution/src/vespa/filedistribution/distributor/filedistributortrackerimpl.h b/filedistribution/src/vespa/filedistribution/distributor/filedistributortrackerimpl.h
deleted file mode 100644
index 5f701228631..00000000000
--- a/filedistribution/src/vespa/filedistribution/distributor/filedistributortrackerimpl.h
+++ /dev/null
@@ -1,39 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#pragma once
-
-#include <libtorrent/session.hpp>
-#include <libtorrent/torrent.hpp>
-
-#include <vespa/filedistribution/model/filedistributionmodel.h>
-#include "scheduler.h"
-#include <mutex>
-
-namespace filedistribution {
-class FileDistributionModel;
-class FileDownloader;
-
-using TorrentSP = boost::shared_ptr<libtorrent::torrent>;
-
-class FileDistributorTrackerImpl : public FileDistributionTracker {
- const std::shared_ptr<FileDistributionModel> _model;
-
- typedef std::lock_guard<std::mutex> LockGuard;
- std::mutex _mutex;
- std::weak_ptr<FileDownloader> _downloader;
-
- //Use separate worker thread to avoid potential deadlock
- //between tracker requests and files to download changed requests.
- std::unique_ptr<Scheduler> _scheduler;
-public:
- FileDistributorTrackerImpl(const std::shared_ptr<FileDistributionModel>& model);
-
- virtual ~FileDistributorTrackerImpl();
-
- //overrides
- void trackingRequest(libtorrent::tracker_request& request, const TorrentSP & torrent) override;
-
- void setDownloader(const std::shared_ptr<FileDownloader>& downloader);
-};
-
-} //namespace filedistribution
-
diff --git a/filedistribution/src/vespa/filedistribution/distributor/filedownloader.cpp b/filedistribution/src/vespa/filedistribution/distributor/filedownloader.cpp
deleted file mode 100644
index 351cb5d5717..00000000000
--- a/filedistribution/src/vespa/filedistribution/distributor/filedownloader.cpp
+++ /dev/null
@@ -1,460 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include "filedownloader.h"
-#include "hostname.h"
-#include <vespa/filedistribution/model/zkfacade.h>
-#include <vespa/vespalib/util/stringfmt.h>
-
-#include <boost/filesystem.hpp>
-#include <boost/filesystem/fstream.hpp>
-#include <boost/filesystem/convenience.hpp>
-#include <boost/function_output_iterator.hpp>
-
-#include <libtorrent/alert.hpp>
-#include <libtorrent/alert_types.hpp>
-#include <libtorrent/torrent_handle.hpp>
-#include <libtorrent/bencode.hpp>
-
-#include <iterator>
-#include <algorithm>
-
-#include <vespa/log/log.h>
-LOG_SETUP(".filedownloader");
-
-using namespace filedistribution;
-namespace fs = boost::filesystem;
-
-using libtorrent::sha1_hash;
-using libtorrent::torrent_handle;
-
-namespace {
-const std::string resumeDataSuffix = ".resume";
-const std::string resumeDataSuffixTemp = ".resumetemp";
-const std::string newFileSuffix = ".new";
-
-std::string
-fileReferenceToString(const libtorrent::sha1_hash& fileReference) {
- std::ostringstream fileReferenceString;
- fileReferenceString <<fileReference;
-
- assert (fileReferenceString.str().size() == 40);
- return fileReferenceString.str();
-}
-
-libtorrent::sha1_hash
-toInfoHash(const std::string& fileReference) {
- assert (fileReference.size() == 40);
- std::istringstream s(fileReference);
-
- sha1_hash infoHash;
- s >> infoHash;
- return infoHash;
-}
-
-void
-addNewFile(const fs::path& dbPath, const fs::path& newFile) {
- LOG(debug, "Adding new file: '%s'.", newFile.string().c_str());
- const fs::path destination = dbPath / newFile.stem();
-
- if ( fs::exists(destination) ) {
- fs::remove_all(destination);
- }
-
- fs::path resumeData = destination.string() + resumeDataSuffix;
- if ( fs::exists(resumeData) ) {
- fs::remove(resumeData);
- }
-
- fs::rename(newFile, destination);
-}
-
-void
-addNewDbFiles(const fs::path& dbPath) {
- for (fs::directory_iterator i(dbPath), end; i != end; ++i) {
- if (newFileSuffix == fs::extension(*i)) {
- addNewFile(dbPath, *i);
- }
- }
-}
-
-fs::path
-resumeDataPath(const libtorrent::torrent_handle& torrent) {
-#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
- fs::path tmp(torrent.save_path());
-#pragma GCC diagnostic pop
- return tmp.string() + resumeDataSuffix;
-}
-
-fs::path
-resumeDataPathTemp(const libtorrent::torrent_handle& torrent) {
-#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
- fs::path tmp(torrent.save_path());
-#pragma GCC diagnostic pop
- return tmp.string() + resumeDataSuffixTemp;
-}
-
-fs::path
-getMainFile(const libtorrent::torrent_handle& handle) {
- const libtorrent::torrent_info fallback(handle.info_hash());
- auto p = handle.torrent_file();
- const libtorrent::torrent_info& info = (p ? *p : fallback);
- return info.files().num_files() == 1 ?
- info.file_at(0).path :
- info.name();
-}
-
-std::string
-getMainName(const libtorrent::torrent_handle& handle) {
- const libtorrent::torrent_info fallback(handle.info_hash());
- auto p = handle.torrent_file();
- const libtorrent::torrent_info& info = (p ? *p : fallback);
- return info.name();
-}
-
-libtorrent::session_settings
-createSessionSettings() {
- libtorrent::session_settings s;
-
- const int unlimited = -1;
- s.active_downloads = s.active_seeds = s.active_limit = unlimited;
-
- s.min_reconnect_time = 1; //seconds
- s.min_announce_interval = 5 * 60; //seconds
- return s;
-}
-
-} //anonymous namespace
-
-namespace filedistribution {
- VESPA_IMPLEMENT_EXCEPTION(NoSuchTorrentException, vespalib::Exception);
-}
-
-struct FileDownloader::EventHandler
-{
- FileDownloader& _fileDownloader;
-
- EventHandler(FileDownloader* fileDownloader)
- : _fileDownloader(*fileDownloader)
- {}
-
- void defaultHandler(const libtorrent::alert& alert) const {
- LOG(debug, "alert %s: %s", alert.what(), alert.message().c_str());
- }
-
- void operator()(const libtorrent::listen_failed_alert& alert) const {
- throw vespalib::PortListenException(alert.endpoint.port(), alert.endpoint.address().to_string(), alert.message(), VESPA_STRLOC);
- }
- void operator()(const libtorrent::fastresume_rejected_alert& alert) const {
- LOG(debug, "alert %s: %s", alert.what(), alert.message().c_str());
- }
- void operator()(const libtorrent::torrent_delete_failed_alert& alert) const {
- LOG(warning, "alert %s: %s", alert.what(), alert.message().c_str());
- }
- void operator()(const libtorrent::file_error_alert& alert) const {
- LOG(error, "alert %s: %s", alert.what(), alert.message().c_str());
- }
-
- void operator()(const libtorrent::torrent_finished_alert& alert) const {
- defaultHandler(alert);
-
- std::string fileReference = fileReferenceToString(alert.handle.info_hash());
-
- LOG(debug, "File '%s' with file reference '%s' downloaded successfully.",
- getMainName(alert.handle).c_str(),
- fileReference.c_str());
-
- _fileDownloader.signalIfFinishedDownloading(fileReference);
- alert.handle.save_resume_data();
- _fileDownloader.didRequestSRD();
- }
-
- void operator()(const libtorrent::save_resume_data_failed_alert& alert) const {
- LOG(warning, "save resume data failed: %s -- %s",
- alert.what(), alert.message().c_str());
- _fileDownloader.didReceiveSRD();
- }
-
- void operator()(const libtorrent::save_resume_data_alert& alert) const {
- defaultHandler(alert);
-
- fs::ofstream resumeFile(resumeDataPathTemp(alert.handle), std::ios_base::binary);
- resumeFile.unsetf(std::ios_base::skipws);
- libtorrent::bencode(std::ostream_iterator<char>(resumeFile), *alert.resume_data);
- resumeFile.close();
- fs::rename(resumeDataPathTemp(alert.handle), resumeDataPath(alert.handle));
- _fileDownloader.didReceiveSRD();
- }
-
- void handle(std::unique_ptr<libtorrent::alert> alert) {
- try {
- libtorrent::handle_alert<
- libtorrent::torrent_finished_alert,
- libtorrent::save_resume_data_alert,
- libtorrent::save_resume_data_failed_alert,
- libtorrent::listen_failed_alert,
- libtorrent::file_error_alert,
- libtorrent::fastresume_rejected_alert,
- libtorrent::torrent_delete_failed_alert>
- dispatch(alert, *this);
- } catch (libtorrent::unhandled_alert& e) {
- LOG(debug, "alert (ignored): %s -- %s",
- alert->what(), alert->message().c_str());
- }
- }
-};
-
-FileDownloader::LogSessionDeconstructed::~LogSessionDeconstructed()
-{
- LOG(debug, "Libtorrent session closed successfully.");
-}
-
-FileDownloader::FileDownloader(const std::shared_ptr<FileDistributionTracker>& tracker,
- const std::string& hostName, int port,
- const fs::path& dbPath)
- : _outstanding_SRD_requests(0),
- _tracker(tracker),
- _session(tracker.get(), libtorrent::fingerprint("vp", 0, 0, 0, 0), 0),
- _closed(false),
- _dbPath(dbPath),
- _hostName(hostName),
- _port(port)
-{
- if (!fs::exists(_dbPath)) {
- fs::create_directories(_dbPath);
- }
- addNewDbFiles(_dbPath);
-
- _session.set_settings(createSessionSettings());
- listen();
- _session.set_alert_mask(
- libtorrent::alert::error_notification |
- libtorrent::alert::status_notification);
-
-}
-
-void
-FileDownloader::drain() {
- EventHandler eventHandler(this);
- size_t cnt = 0;
- size_t waitCount = 0;
- do {
- LOG(debug, "destructor waiting for %zu SRD alerts", _outstanding_SRD_requests.load());
- while (_session.wait_for_alert(libtorrent::milliseconds(20))) {
- std::unique_ptr<libtorrent::alert> alert = _session.pop_alert();
- eventHandler.handle(std::move(alert));
- ++cnt;
- }
- waitCount++;
- } while (!drained() && (waitCount < 1000));
- LOG(debug, "handled %zu alerts during draining.", cnt);
- if (!drained()) {
- LOG(error, "handled %zu alerts during draining. But there are still %zu left.", cnt, _outstanding_SRD_requests.load());
- LOG(error, "We have been waiting for stuff that did not happen.");
- }
-}
-
-FileDownloader::~FileDownloader() {
- assert(drained());
-}
-
-void
-FileDownloader::listen() {
- boost::system::error_code ec;
- _session.listen_on(std::make_pair(_port, _port), // (min, max)
- ec, 0, // network interface (default value)
- libtorrent::session::listen_no_system_port);
- if (!ec && (_session.listen_port() == _port)) {
- return;
- }
- throw vespalib::PortListenException(_port, _hostName, VESPA_STRLOC);
-}
-
-boost::optional< fs::path >
-FileDownloader::pathToCompletedFile(const std::string& fileReference) const {
- libtorrent::torrent_handle torrent = _session.find_torrent(toInfoHash(fileReference));
-
-#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
- if (torrent.is_valid() && torrent.is_finished()) {
-#pragma GCC diagnostic pop
- return _dbPath / fileReference / getMainFile(torrent);
- } else {
- return boost::optional< fs::path>();
- }
-}
-
-
-boost::optional<FileDownloader::ResumeDataBuffer>
-FileDownloader::getResumeData(const std::string& fileReference) {
- LOG(debug, "Reading resume data for '%s'", fileReference.c_str());
- try {
- fs::path path = (_dbPath / fileReference).string() + resumeDataSuffix;
- if (fs::exists(path)) {
- fs::ifstream file(path, std::ios::binary);
- ResumeDataBuffer result;
-
- std::istream_iterator<char> iterator(file), end;
- std::copy(iterator, end, std::back_inserter(result));
- LOG(debug, "Successfully retrieved resume data for '%s'", fileReference.c_str());
- if (result.size() < 50) {
- LOG(info, "Very small resume file %zu bytes.", result.size());
- }
-
- return result;
- }
- } catch(...) {
- //resume data is only an optimization
- LOG(info, "Error while reading resume data for '%s'", fileReference.c_str());
- }
- return boost::optional<ResumeDataBuffer>();
-}
-
-
-bool
-FileDownloader::hasTorrent(const std::string& fileReference) const {
- return _session.find_torrent(toInfoHash(fileReference)).is_valid();
-}
-
-void
-FileDownloader::addTorrent(const std::string& fileReference, const Buffer& buffer) {
- if (closed()) { return; }
- LockGuard guard(_modifyTorrentsDownloadingMutex);
-
- boost::optional<ResumeDataBuffer> resumeData = getResumeData(fileReference);
-
- if (_session.find_torrent( (toInfoHash(fileReference))).is_valid())
- return;
-
- libtorrent::lazy_entry entry;
-#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
- libtorrent::lazy_bdecode(&*buffer.begin(), &*buffer.end(), entry); //out
-#pragma GCC diagnostic pop
-
- libtorrent::add_torrent_params torrentParams;
- torrentParams.save_path = (_dbPath / fileReference).string();
- torrentParams.ti = new libtorrent::torrent_info(entry);
-
- torrentParams.auto_managed = false;
- torrentParams.paused = false;
-
- if (resumeData) {
- torrentParams.resume_data = *resumeData; //vector will be swapped
- }
-
- libtorrent::torrent_handle torrentHandle = _session.add_torrent(torrentParams);
-
- LOG(debug, "Started downloading file '%s' with file reference '%s'.",
- getMainName(torrentHandle).c_str(), fileReference.c_str());
-}
-
-
-void
-FileDownloader::deleteTorrentData(const libtorrent::torrent_handle& torrent, LockGuard&) {
- if (torrent.is_valid()) {
- fs::path resumeFilePath = resumeDataPath(torrent);
-
-#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
- fs::path savePath = torrent.save_path();
-#pragma GCC diagnostic pop
-
- //the files might not exist, so ignore return value
- fs::remove_all(savePath);
- fs::remove(resumeFilePath);
- }
-
- _downloadFailed(fileReferenceToString(torrent.info_hash()), FileProvider::FileReferenceRemoved);
-}
-
-void
-FileDownloader::removeAllTorrentsBut(const std::set<std::string> & filesToRetain) {
- if (closed()) { return; }
- LockGuard guard(_modifyTorrentsDownloadingMutex);
-
- std::set<std::string> currentFiles;
- std::set<sha1_hash> infoHashesToRetain;
- for (const std::string& fileReference : filesToRetain) {
- infoHashesToRetain.insert(toInfoHash(fileReference));
- }
-
- std::vector<torrent_handle> torrents = _session.get_torrents();
-
- for (torrent_handle torrent : torrents) {
- if (!infoHashesToRetain.count(torrent.info_hash())) {
- LOG(info, "Removing torrent: '%s' with file reference '%s'",
- getMainName(torrent).c_str(), fileReferenceToString(torrent.info_hash()).c_str());
-
- deleteTorrentData(torrent, guard);
- _session.remove_torrent(torrent);
- }
- }
-}
-
-
-void FileDownloader::runEventLoop() {
- EventHandler eventHandler(this);
- while ( ! closed() ) {
- try {
- if (_session.wait_for_alert(libtorrent::milliseconds(100))) {
- std::unique_ptr<libtorrent::alert> alert = _session.pop_alert();
- eventHandler.handle(std::move(alert));
- }
- } catch (const ZKConnectionLossException & e) {
- LOG(info, "Connection loss in downloader event loop, resuming. %s", e.what());
- } catch (const vespalib::PortListenException & e) {
- LOG(warning, "Failed listening to torrent port : %s", e.what());
- std::quick_exit(21);
- } catch (const boost::filesystem::filesystem_error & e) {
- LOG(warning, "Some boost file operations failed : %s", e.what());
- std::quick_exit(22);
- }
- }
- drain();
-}
-
-bool
-FileDownloader::closed() const
-{
- return _closed.load();
-}
-
-void
-FileDownloader::close()
-{
- _closed.store(true);
-}
-
-void
-FileDownloader::signalIfFinishedDownloading(const std::string& fileReference) {
- boost::optional<fs::path> path = pathToCompletedFile(fileReference);
-
- if (path) {
- _downloadCompleted(fileReference, *path);
- }
-}
-
-std::string
-FileDownloader::infoHash2FileReference(const libtorrent::sha1_hash& infoHash) {
- //TODO
- return fileReferenceToString(infoHash);
-}
-
-namespace {
-int
-toBytesPerSec(double MBPerSec) {
- return static_cast<int>(MBPerSec * 1024 * 1024);
-}
-}
-
-void
-FileDownloader::setMaxDownloadSpeed(double MBPerSec) {
- LOG(config, "Setting max download speed to %f MB/sec", MBPerSec);
-#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
- _session.set_download_rate_limit(toBytesPerSec(MBPerSec));
-#pragma GCC diagnostic pop
-}
-
-void
-FileDownloader::setMaxUploadSpeed(double MBPerSec) {
- LOG(config, "Setting max upload speed to %f MB/sec", MBPerSec);
-#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
- _session.set_upload_rate_limit(toBytesPerSec(MBPerSec));
-#pragma GCC diagnostic pop
-}
diff --git a/filedistribution/src/vespa/filedistribution/distributor/filedownloader.h b/filedistribution/src/vespa/filedistribution/distributor/filedownloader.h
deleted file mode 100644
index 3f608e85e64..00000000000
--- a/filedistribution/src/vespa/filedistribution/distributor/filedownloader.h
+++ /dev/null
@@ -1,86 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#pragma once
-
-#include <vector>
-#include <mutex>
-#include <boost/optional.hpp>
-
-#include <libtorrent/session.hpp>
-
-#include <vespa/filedistribution/rpc/fileprovider.h>
-#include "hostname.h"
-#include <vespa/filedistribution/common/buffer.h>
-#include <vespa/filedistribution/common/exception.h>
-#include <vespa/filedistribution/model/filedbmodel.h>
-
-namespace filedistribution {
-
-VESPA_DEFINE_EXCEPTION(NoSuchTorrentException, vespalib::Exception);
-
-class FileDownloader
-{
- struct EventHandler;
- struct LogSessionDeconstructed {
- ~LogSessionDeconstructed();
- };
-
- std::atomic<size_t> _outstanding_SRD_requests;
- std::shared_ptr<FileDistributionTracker> _tracker;
-
- std::mutex _modifyTorrentsDownloadingMutex;
- typedef std::lock_guard<std::mutex> LockGuard;
-
- LogSessionDeconstructed _logSessionDeconstructed;
- //session is safe to use from multiple threads.
- libtorrent::session _session;
- std::atomic<bool> _closed;
-
- const Path _dbPath;
- typedef std::vector<char> ResumeDataBuffer;
- boost::optional<ResumeDataBuffer> getResumeData(const std::string& fileReference);
-
- class RemoveTorrent;
-
- void deleteTorrentData(const libtorrent::torrent_handle& torrent, LockGuard&);
- void listen();
- bool closed() const;
- void drain();
-public:
- // accounting of save-resume-data requests:
- void didRequestSRD() { ++_outstanding_SRD_requests; }
- void didReceiveSRD() { --_outstanding_SRD_requests; }
-
- typedef FileProvider::DownloadCompletedSignal DownloadCompletedSignal;
- typedef FileProvider::DownloadFailedSignal DownloadFailedSignal;
-
- FileDownloader(const std::shared_ptr<FileDistributionTracker>& tracker,
- const std::string& hostName, int port,
- const Path& dbPath);
- ~FileDownloader();
- DirectoryGuard::UP getGuard() { return std::make_unique<DirectoryGuard>(_dbPath); }
-
- void runEventLoop();
- void addTorrent(const std::string& fileReference, const Buffer& buffer);
- bool hasTorrent(const std::string& fileReference) const;
- boost::optional<Path> pathToCompletedFile(const std::string& fileReference) const;
- void removeAllTorrentsBut(const std::set<std::string> & filesToRetain);
-
- void signalIfFinishedDownloading(const std::string& fileReference);
-
- std::string infoHash2FileReference(const libtorrent::sha1_hash& hash);
- void setMaxDownloadSpeed(double MBPerSec);
- void setMaxUploadSpeed(double MBPerSec);
- void close();
- bool drained() const { return _outstanding_SRD_requests == 0; }
-
- const std::string _hostName;
- const int _port;
-
- //signals
- DownloadCompletedSignal _downloadCompleted;
- DownloadFailedSignal _downloadFailed; //removed or error
-};
-
-} //namespace filedistribution
-
-
diff --git a/filedistribution/src/vespa/filedistribution/distributor/filedownloadermanager.cpp b/filedistribution/src/vespa/filedistribution/distributor/filedownloadermanager.cpp
deleted file mode 100644
index 1763b798f03..00000000000
--- a/filedistribution/src/vespa/filedistribution/distributor/filedownloadermanager.cpp
+++ /dev/null
@@ -1,136 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "filedownloadermanager.h"
-#include <thread>
-
-#include <vespa/log/log.h>
-LOG_SETUP(".filedownloadermanager");
-
-using namespace std::literals;
-
-using filedistribution::FileDownloaderManager;
-using filedistribution::Path;
-
-namespace {
-void logStartDownload(const std::set<std::string> & filesToDownload) {
- std::ostringstream msg;
- msg << "StartDownloads:" << std::endl;
- std::copy(filesToDownload.begin(), filesToDownload.end(),
- std::ostream_iterator<std::string>(msg, "\n"));
- LOG(debug, "%s", msg.str().c_str());
-}
-} //anonymous namespace
-
-FileDownloaderManager::FileDownloaderManager(
- const std::shared_ptr<FileDownloader>& downloader,
- const std::shared_ptr<FileDistributionModel>& model)
-
- :_fileDownloader(downloader),
- _fileDistributionModel(model),
- _startDownloads(this),
- _setFinishedDownloadingStatus(this)
-{}
-
-FileDownloaderManager::~FileDownloaderManager() {
- LOG(debug, "Deconstructing FileDownloaderManager");
-}
-
-void
-FileDownloaderManager::start()
-{
- _downloadFailedConnection = downloadFailed().connect(
- DownloadFailedSignal::slot_type([&] (const std::string & peer, FileProvider::FailedDownloadReason reason) { (void) reason; removePeerStatus(peer); }).track_foreign(shared_from_this()));
-
- _downloadCompletedConnection = downloadCompleted().connect(
- DownloadCompletedSignal::slot_type(_setFinishedDownloadingStatus).track_foreign(shared_from_this()));
-
- _filesToDownloadChangedConnection = _fileDistributionModel->_filesToDownloadChanged.connect(
- FileDistributionModel::FilesToDownloadChangedSignal::slot_type(std::ref(_startDownloads)).track_foreign(shared_from_this()));
-}
-
-boost::optional< Path >
-FileDownloaderManager::getPath(const std::string& fileReference) {
- return _fileDownloader->pathToCompletedFile(fileReference);
-}
-
-void
-FileDownloaderManager::downloadFile(const std::string& fileReference) {
- {
- LockGuard updateFilesToDownloadGuard(_updateFilesToDownloadMutex);
- _startDownloads.downloadFile(fileReference);
- }
-
- //if the file is already downloading but not completed before the above call,
- //the finished download callback might come before the interested party
- //has called connectFinishedDownloadingHandler.
- //An explicit call is therefore used to mitigate this problem:
- //Do not hold updateFilesToDownloadMutex when calling this, as it might cause deadlock.
- _fileDownloader->signalIfFinishedDownloading(fileReference);
-}
-
-void
-FileDownloaderManager::removePeerStatus(const std::string& fileReference) {
- //TODO: Simplify by using separate thread for removal:
- //currently called via StartDownloads which already holds a lock on updateFilesToDownloadMutex.
-
- _fileDistributionModel->removePeer(fileReference);
-}
-
-void
-FileDownloaderManager::StartDownloads::downloadFile(const std::string& fileReference) {
- if (!_parent._fileDownloader->hasTorrent(fileReference)) {
- Buffer torrent(_parent._fileDistributionModel->getFileDBModel().getFile(fileReference));
-
- _parent._fileDistributionModel->addPeer(fileReference);
- _parent._fileDownloader->addTorrent(fileReference, torrent);
- }
-}
-
-
-void
-FileDownloaderManager::StartDownloads::operator()() {
-
- DirectoryGuard::UP guard = _parent._fileDownloader->getGuard();
- LockGuard updateFilesToDownloadGuard(_parent._updateFilesToDownloadMutex);
-
- std::set<std::string> filesToDownload = _parent._fileDistributionModel->getFilesToDownload();
- logStartDownload(filesToDownload);
-
- std::for_each(filesToDownload.begin(), filesToDownload.end(),
- [&] (const std::string& file) { downloadFile(file); });
-
- _parent._fileDownloader->removeAllTorrentsBut(filesToDownload);
-}
-
-FileDownloaderManager::StartDownloads::StartDownloads(FileDownloaderManager* parent)
- :_parent(*parent)
-{}
-
-
-FileDownloaderManager::SetFinishedDownloadingStatus::SetFinishedDownloadingStatus(
- FileDownloaderManager* parent)
- :_parent(*parent)
-{}
-
-void
-FileDownloaderManager::SetFinishedDownloadingStatus::operator()(
- const std::string& fileReference, const Path&) {
-
- //Prevent concurrent modifications to peer node in zk.
- LockGuard updateFilesToDownloadGuard(_parent._updateFilesToDownloadMutex);
-
- try {
- _parent._fileDistributionModel->peerFinished(fileReference);
- } catch (const NotPeer &) { //Probably a concurrent removal of the torrent.
-
- //improve chance of libtorrent session being updated.
- std::this_thread::sleep_for(100ms);
- if (_parent._fileDownloader->hasTorrent(fileReference)) {
-
- _parent._fileDistributionModel->addPeer(fileReference);
- _parent._fileDistributionModel->peerFinished(fileReference);
- } else {
- LOG(debug, "OK: Torrent '%s' finished concurrently with its removal.", fileReference.c_str());
- }
- }
-}
diff --git a/filedistribution/src/vespa/filedistribution/distributor/filedownloadermanager.h b/filedistribution/src/vespa/filedistribution/distributor/filedownloadermanager.h
deleted file mode 100644
index 107685b4e38..00000000000
--- a/filedistribution/src/vespa/filedistribution/distributor/filedownloadermanager.h
+++ /dev/null
@@ -1,67 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#pragma once
-
-#include <boost/signals2/signal.hpp>
-
-#include <vespa/filedistribution/rpc/fileprovider.h>
-#include <vespa/filedistribution/model/filedistributionmodel.h>
-#include "filedownloader.h"
-
-namespace filedistribution {
-
-class FileDownloaderManager : public FileProvider,
- public std::enable_shared_from_this<FileDownloaderManager> {
-
- class StartDownloads {
- FileDownloaderManager& _parent;
- public:
- void operator()();
- void downloadFile(const std::string& fileReference);
- StartDownloads(FileDownloaderManager* parent);
- };
-
- class SetFinishedDownloadingStatus {
- FileDownloaderManager& _parent;
- public:
- void operator()(const std::string& fileReference, const Path&);
- SetFinishedDownloadingStatus(FileDownloaderManager*);
- };
-
- typedef std::lock_guard<std::mutex> LockGuard;
- std::mutex _updateFilesToDownloadMutex;
-
- std::shared_ptr<FileDownloader> _fileDownloader;
- std::shared_ptr<FileDistributionModel> _fileDistributionModel;
- StartDownloads _startDownloads;
- SetFinishedDownloadingStatus _setFinishedDownloadingStatus;
-
- boost::signals2::scoped_connection _downloadFailedConnection;
- boost::signals2::scoped_connection _downloadCompletedConnection;
- boost::signals2::scoped_connection _filesToDownloadChangedConnection;
-
- void removePeerStatus(const std::string& fileReference);
-public:
- using SP = std::shared_ptr<FileDownloaderManager>;
- FileDownloaderManager(const FileDownloaderManager &) = delete;
- FileDownloaderManager & operator = (const FileDownloaderManager &) = delete;
- FileDownloaderManager(const std::shared_ptr<FileDownloader>&,
- const std::shared_ptr<FileDistributionModel>& model);
- ~FileDownloaderManager();
- void start();
-
- boost::optional<Path> getPath(const std::string& fileReference) override;
- void downloadFile(const std::string& fileReference) override;
-
- //FileProvider overrides
- DownloadCompletedSignal& downloadCompleted() override {
- return _fileDownloader->_downloadCompleted;
- }
-
- DownloadFailedSignal& downloadFailed() override {
- return _fileDownloader->_downloadFailed;
- }
-};
-
-} //namespace filedistribution
-
-
diff --git a/filedistribution/src/vespa/filedistribution/distributor/hostname.cpp b/filedistribution/src/vespa/filedistribution/distributor/hostname.cpp
deleted file mode 100644
index 253cc822b3c..00000000000
--- a/filedistribution/src/vespa/filedistribution/distributor/hostname.cpp
+++ /dev/null
@@ -1,25 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "hostname.h"
-#include <vespa/vespalib/util/stringfmt.h>
-#include <vespa/vespalib/net/socket_address.h>
-#include <vespa/log/log.h>
-LOG_SETUP(".hostname");
-
-namespace filedistribution {
-
-std::string
-lookupIPAddress(const std::string& hostName)
-{
- auto best_addr = vespalib::SocketAddress::select_remote(0, hostName.c_str());
- if (!best_addr.valid()) {
- throw filedistribution::FailedResolvingHostName(hostName, VESPA_STRLOC);
- }
- const std::string address = best_addr.ip_address();
- LOG(debug, "Resolved hostname'%s' as '%s'", hostName.c_str(), address.c_str());
- return address;
-}
-
-VESPA_IMPLEMENT_EXCEPTION(FailedResolvingHostName, vespalib::Exception);
-
-}
diff --git a/filedistribution/src/vespa/filedistribution/distributor/hostname.h b/filedistribution/src/vespa/filedistribution/distributor/hostname.h
deleted file mode 100644
index c4aa7c50787..00000000000
--- a/filedistribution/src/vespa/filedistribution/distributor/hostname.h
+++ /dev/null
@@ -1,13 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#pragma once
-
-#include <string>
-#include <vespa/filedistribution/common/exception.h>
-
-namespace filedistribution {
-
-std::string lookupIPAddress(const std::string& hostName);
-
-VESPA_DEFINE_EXCEPTION(FailedResolvingHostName, vespalib::Exception);
-
-}
diff --git a/filedistribution/src/vespa/filedistribution/distributor/scheduler.cpp b/filedistribution/src/vespa/filedistribution/distributor/scheduler.cpp
deleted file mode 100644
index fec2e94dbb2..00000000000
--- a/filedistribution/src/vespa/filedistribution/distributor/scheduler.cpp
+++ /dev/null
@@ -1,45 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "scheduler.h"
-
-namespace asio = boost::asio;
-
-using filedistribution::Scheduler;
-typedef Scheduler::Task Task;
-
-Task::Task(Scheduler& scheduler)
- : _timer(scheduler.ioService)
-{}
-
-void
-Task::schedule(asio::deadline_timer::duration_type delay)
-{
- _timer.expires_from_now(delay);
- std::shared_ptr<Task> self = shared_from_this();;
- _timer.async_wait([self](const auto & e) { self->handle(e); });
-}
-
-void
-Task::scheduleNow()
-{
- schedule(boost::posix_time::seconds(0));
-}
-
-void
-Task::handle(const boost::system::error_code& code) {
- if (code != asio::error::operation_aborted) {
- doHandle();
- }
-}
-
-
-Scheduler::Scheduler(std::function<void (asio::io_service&)> callRun)
- :_keepAliveWork(ioService),
- _workerThread([&, callRun]() { callRun(ioService); })
-{}
-
-Scheduler::~Scheduler() {
- ioService.stop();
- _workerThread.join();
- ioService.reset();
-}
diff --git a/filedistribution/src/vespa/filedistribution/distributor/scheduler.h b/filedistribution/src/vespa/filedistribution/distributor/scheduler.h
deleted file mode 100644
index 9ac53656127..00000000000
--- a/filedistribution/src/vespa/filedistribution/distributor/scheduler.h
+++ /dev/null
@@ -1,46 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#pragma once
-
-#include <boost/asio/io_service.hpp>
-#include <boost/asio/deadline_timer.hpp>
-#include <thread>
-
-
-namespace filedistribution {
-
-class Scheduler {
-public:
- class Task : public std::enable_shared_from_this<Task> {
- boost::asio::deadline_timer _timer;
- public:
- typedef std::shared_ptr<Task> SP;
-
- Task(Scheduler& scheduler);
-
- virtual ~Task() {}
-
- void schedule(boost::asio::deadline_timer::duration_type delay);
- void scheduleNow();
-
- void handle(const boost::system::error_code& code);
- protected:
- virtual void doHandle() = 0;
- };
-
-private:
- boost::asio::io_service ioService;
-
- //keeps io_service.run() from exiting until it has been destructed,
- //see http://www.boost.org/doc/libs/1_42_0/doc/html/boost_asio/reference/io_service.html
- boost::asio::io_service::work _keepAliveWork;
- std::thread _workerThread;
-
-public:
- Scheduler(const Scheduler &) = delete;
- Scheduler & operator = (const Scheduler &) = delete;
- Scheduler(std::function<void (boost::asio::io_service&)> callRun) ;
- ~Scheduler();
-};
-
-}
-
diff --git a/filedistribution/src/vespa/filedistribution/distributor/signalhandling.cpp b/filedistribution/src/vespa/filedistribution/distributor/signalhandling.cpp
deleted file mode 100644
index 35ff9e4e8ca..00000000000
--- a/filedistribution/src/vespa/filedistribution/distributor/signalhandling.cpp
+++ /dev/null
@@ -1,40 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "signalhandling.h"
-#include <vespa/vespalib/util/signalhandler.h>
-
-#include <vespa/log/log.h>
-LOG_SETUP(".signalhandling");
-
-typedef vespalib::SignalHandler SIG;
-
-void
-initSignals() {
- SIG::PIPE.ignore();
- SIG::INT.hook();
- SIG::TERM.hook();
- SIG::USR1.hook();
-}
-
-bool
-askedToShutDown() {
- bool result = SIG::INT.check() || SIG::TERM.check();
- if (result) {
- LOG(debug, "Asked to shut down.");
- }
- return result;
-}
-
-bool
-askedToReinitialize() {
- bool result = SIG::USR1.check();
- if (result) {
- LOG(debug, "Asked to reinitialize.");
- }
- return result;
-}
-
-void
-clearReinitializeFlag() {
- SIG::USR1.clear();
-}
diff --git a/filedistribution/src/vespa/filedistribution/distributor/signalhandling.h b/filedistribution/src/vespa/filedistribution/distributor/signalhandling.h
deleted file mode 100644
index 28713d16aaa..00000000000
--- a/filedistribution/src/vespa/filedistribution/distributor/signalhandling.h
+++ /dev/null
@@ -1,15 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#pragma once
-
-void
-initSignals();
-
-bool
-askedToShutDown();
-
-bool
-askedToReinitialize();
-
-void
-clearReinitializeFlag();
-
diff --git a/filedistribution/src/vespa/filedistribution/distributor/state_server_impl.cpp b/filedistribution/src/vespa/filedistribution/distributor/state_server_impl.cpp
deleted file mode 100644
index fbdf8155302..00000000000
--- a/filedistribution/src/vespa/filedistribution/distributor/state_server_impl.cpp
+++ /dev/null
@@ -1,7 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "state_server_impl.h"
-
-namespace filedistribution {
-
-} // namespace filedistribution
diff --git a/filedistribution/src/vespa/filedistribution/distributor/state_server_impl.h b/filedistribution/src/vespa/filedistribution/distributor/state_server_impl.h
deleted file mode 100644
index 0017995c776..00000000000
--- a/filedistribution/src/vespa/filedistribution/distributor/state_server_impl.h
+++ /dev/null
@@ -1,21 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#pragma once
-
-#include <vespa/vespalib/net/state_server.h>
-#include <vespa/vespalib/net/simple_metrics_producer.h>
-#include <vespa/vespalib/net/simple_health_producer.h>
-#include <vespa/vespalib/net/simple_component_config_producer.h>
-
-namespace filedistribution {
-
-struct StateServerImpl {
- vespalib::SimpleHealthProducer myHealth;
- vespalib::SimpleMetricsProducer myMetrics;
- vespalib::SimpleComponentConfigProducer myComponents;
- vespalib::StateServer myStateServer;
-
- StateServerImpl(int port) : myStateServer(port, myHealth, myMetrics, myComponents) {}
-};
-
-} // namespace filedistribution
diff --git a/filedistribution/src/vespa/filedistribution/manager/.gitignore b/filedistribution/src/vespa/filedistribution/manager/.gitignore
deleted file mode 100644
index b94a4aae8e0..00000000000
--- a/filedistribution/src/vespa/filedistribution/manager/.gitignore
+++ /dev/null
@@ -1,5 +0,0 @@
-.depend
-Makefile
-com_yahoo_vespa_filedistribution_*.h
-*.So
-/libfiledistributionmanager.so.5.1
diff --git a/filedistribution/src/vespa/filedistribution/manager/CMakeLists.txt b/filedistribution/src/vespa/filedistribution/manager/CMakeLists.txt
deleted file mode 100644
index b5798b7b72e..00000000000
--- a/filedistribution/src/vespa/filedistribution/manager/CMakeLists.txt
+++ /dev/null
@@ -1,17 +0,0 @@
-# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-vespa_add_library(filedistribution_filedistributionmanager
- SOURCES
- createtorrent.cpp
- filedb.cpp
- stderr_logfwd.cpp
- $<TARGET_OBJECTS:filedistribution_filedbmodel>
- $<TARGET_OBJECTS:filedistribution_exceptionrethrower>
- INSTALL lib64
- OUTPUT_NAME filedistributionmanager
- DEPENDS
- boost_system${VESPA_BOOST_LIB_SUFFIX}
- boost_thread${VESPA_BOOST_LIB_SUFFIX}
- boost_filesystem${VESPA_BOOST_LIB_SUFFIX}
- zookeeper_mt
- ${JAVA_JVM_LIBRARY}
-)
diff --git a/filedistribution/src/vespa/filedistribution/manager/createtorrent.cpp b/filedistribution/src/vespa/filedistribution/manager/createtorrent.cpp
deleted file mode 100644
index 6de773ebd67..00000000000
--- a/filedistribution/src/vespa/filedistribution/manager/createtorrent.cpp
+++ /dev/null
@@ -1,87 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include "createtorrent.h"
-
-#include <libtorrent/torrent_info.hpp>
-#include <boost/filesystem/convenience.hpp>
-
-#include <iostream>
-#include <fstream>
-#include <cmath>
-#include <iterator>
-#include <sstream>
-#include <string>
-
-namespace fs = boost::filesystem;
-
-namespace {
-
-const libtorrent::size_type targetTorrentSize = 64 * 1024;
-
-void
-aggregateFilenames(std::vector<std::string>& accumulator, const fs::path& path, std::string currPrefix)
-{
- if (fs::is_directory(path)) {
- for (fs::directory_iterator i(path), end;
- i != end;
- ++i) {
- std::string newPrefix = currPrefix + "/" + std::string(i->path().filename().c_str());
- accumulator.push_back(newPrefix);
- aggregateFilenames(accumulator, *i, newPrefix);
- }
- }
-}
-
-
-libtorrent::entry
-createEntry(const fs::path& path) {
- if (!fs::exists(path))
- throw std::runtime_error("Path '" + std::string(path.filename().c_str()) + " does not exists");
-
- libtorrent::file_storage fileStorage;
- libtorrent::add_files(fileStorage, path.string());
-
- libtorrent::create_torrent torrent(fileStorage);
- torrent.set_creator("vespa-filedistributor");
- torrent.set_priv(true);
- torrent.add_tracker("");
-
- libtorrent::set_piece_hashes(torrent, path.branch_path().string());
- return torrent.generate();
-}
-
-std::string
-fileReferenceToString(const libtorrent::sha1_hash& fileReference) {
- std::ostringstream fileReferenceString;
- fileReferenceString <<fileReference;
- return fileReferenceString.str();
-}
-
-} //anonymous namespace
-
-filedistribution::
-CreateTorrent::
-CreateTorrent(const Path& path)
- :_path(path),
- _entry(createEntry(_path))
-{}
-
-filedistribution::Buffer
-filedistribution::
-CreateTorrent::
-bencode() const
-{
- Buffer buffer(static_cast<int>(targetTorrentSize));
- libtorrent::bencode(std::back_inserter(buffer), _entry);
- return buffer;
-}
-
-const std::string
-filedistribution::
-CreateTorrent::
-fileReference() const
-{
-#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
- libtorrent::torrent_info info(_entry);
-#pragma GCC diagnostic pop
- return fileReferenceToString(info.info_hash());
-}
diff --git a/filedistribution/src/vespa/filedistribution/manager/createtorrent.h b/filedistribution/src/vespa/filedistribution/manager/createtorrent.h
deleted file mode 100644
index d84c9ad1d25..00000000000
--- a/filedistribution/src/vespa/filedistribution/manager/createtorrent.h
+++ /dev/null
@@ -1,23 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#pragma once
-
-#include <vector>
-#include <libtorrent/create_torrent.hpp>
-
-#include <vespa/filedistribution/common/buffer.h>
-#include <vespa/filedistribution/common/exception.h>
-
-namespace filedistribution {
-
-class CreateTorrent {
- Path _path;
- libtorrent::entry _entry;
-public:
-
- CreateTorrent(const Path& path);
- Buffer bencode() const;
- const std::string fileReference() const;
-};
-
-} //namespace filedistribution
-
diff --git a/filedistribution/src/vespa/filedistribution/manager/field.h b/filedistribution/src/vespa/filedistribution/manager/field.h
deleted file mode 100644
index 04fbc286c8d..00000000000
--- a/filedistribution/src/vespa/filedistribution/manager/field.h
+++ /dev/null
@@ -1,46 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#pragma once
-
-#include <jni.h>
-
-#include <vespa/filedistribution/common/exception.h>
-
-namespace filedistribution {
-
-struct BadFieldException : std::runtime_error {
- explicit BadFieldException(const std::string& name)
- : runtime_error("Could not lookup field '" + name + "'")
- {}
-
- BadFieldException(const BadFieldException& e)
- : runtime_error(e)
- {}
-};
-
-template <class T>
-class LongField {
- jfieldID _fieldID;
-public:
- LongField()
- {}
-
- LongField(jclass clazz, const char* fieldName, JNIEnv* env)
- :_fieldID(env->GetFieldID(clazz, fieldName, "J")) {
-
- if (!_fieldID)
- BOOST_THROW_EXCEPTION(BadFieldException(fieldName));
- }
-
- void set(jobject obj, T value, JNIEnv* env) {
- jlong longValue = reinterpret_cast<long>(value);
- env->SetLongField(obj, _fieldID, longValue);
- }
-
- T get(jobject obj, JNIEnv* env) {
- jlong longValue = env->GetLongField(obj, _fieldID);
- return reinterpret_cast<T>(longValue);
- }
-};
-
-} //namespace filedistribution
-
diff --git a/filedistribution/src/vespa/filedistribution/manager/filedb.cpp b/filedistribution/src/vespa/filedistribution/manager/filedb.cpp
deleted file mode 100644
index bc725684846..00000000000
--- a/filedistribution/src/vespa/filedistribution/manager/filedb.cpp
+++ /dev/null
@@ -1,61 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "filedb.h"
-
-#include <boost/filesystem.hpp>
-
-namespace fs = boost::filesystem;
-
-namespace {
-
-void copyDirectory(fs::path original, fs::path destination)
-{
- fs::create_directory(destination);
- for (fs::directory_iterator curr(original), end;
- curr != end;
- ++curr) {
-
- fs::path destPath = destination / curr->path().filename();
- if ( fs::is_directory(curr->status()) ) {
- copyDirectory(*curr, destPath);
- } else {
- fs::copy_file(*curr, destPath);
- }
- }
-}
-
-} //anonymous namespace
-
-namespace filedistribution {
-
-FileDB::FileDB(fs::path dbPath)
- : _dbPath(dbPath) {}
-
-
-bool
-FileDB::add(const DirectoryGuard & directoryGuard, fs::path original, const std::string &name) {
- (void) directoryGuard;
- fs::path finalPath = _dbPath / name;
- fs::path targetPath = _dbPath / (name + ".new");
- if (fs::exists(finalPath) || fs::exists(targetPath)) {
- return false;
- }
- fs::path targetPathTemp = _dbPath / (name + ".tmp");
-
- if (fs::exists(targetPathTemp)) {
- fs::remove_all(targetPathTemp);
- }
-
- fs::create_directory(targetPathTemp);
- if (!fs::is_directory(original)) {
- fs::copy_file(original, targetPathTemp / original.filename());
- } else {
- copyDirectory(original, targetPathTemp / original.filename());
- }
-
- assert(!fs::exists(targetPath));
- fs::rename(targetPathTemp, targetPath);
- return true;
-}
-
-}
diff --git a/filedistribution/src/vespa/filedistribution/manager/filedb.h b/filedistribution/src/vespa/filedistribution/manager/filedb.h
deleted file mode 100644
index 4ab840a5595..00000000000
--- a/filedistribution/src/vespa/filedistribution/manager/filedb.h
+++ /dev/null
@@ -1,25 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#pragma once
-
-#include <string>
-#include <vespa/filedistribution/model/filedbmodel.h>
-
-namespace filedistribution {
-
-class FileDB {
- Path _dbPath;
-public:
- FileDB(Path dbPath);
- DirectoryGuard::UP getGuard() { return std::make_unique<DirectoryGuard>(_dbPath); }
- /**
- *
- * @param directoryGuard The guard you need to hold in order to prevent someone fidling with your directory.
- * @param original The file top copy
- * @param name The name the file shall have.
- * @return true if it was added, false if it was already present.
- */
- bool add(const DirectoryGuard & directoryGuard, Path original, const std::string& name);
-};
-
-} //namespace filedistribution
-
diff --git a/filedistribution/src/vespa/filedistribution/manager/filedistributionmanager.cpp b/filedistribution/src/vespa/filedistribution/manager/filedistributionmanager.cpp
deleted file mode 100644
index f0825343f27..00000000000
--- a/filedistribution/src/vespa/filedistribution/manager/filedistributionmanager.cpp
+++ /dev/null
@@ -1,188 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "jnistring.h"
-#include "field.h"
-#include "createtorrent.h"
-#include "filedb.h"
-#include <vespa/filedistribution/manager/com_yahoo_vespa_filedistribution_FileDistributionManager.h>
-
-#include <vespa/filedistribution/model/filedistributionmodel.h>
-#include <vespa/filedistribution/model/zkfiledbmodel.h>
-#include <vespa/filedistribution/model/mockfiledistributionmodel.h>
-
-using namespace filedistribution;
-
-namespace fs = boost::filesystem;
-
-namespace {
-
-class NativeFileDistributionManager {
- public:
- std::unique_ptr<FileDBModel> _fileDBModel;
- std::unique_ptr<FileDB> _fileDB;
-};
-
-LongField<NativeFileDistributionManager*> nativeFileDistributionManagerField;
-
-void throwRuntimeException(const char* msg, JNIEnv* env)
-{
- //do not mask active exception.
- if (!env->ExceptionOccurred()) {
- jclass runtimeExceptionClass = env->FindClass("java/lang/RuntimeException");
- if (runtimeExceptionClass) {
- env->ThrowNew(runtimeExceptionClass, msg);
- }
- }
-}
-
-template <class FIELD>
-void deleteField(FIELD& field, jobject self, JNIEnv* env)
-{
- delete field.get(self, env);
- field.set(self, 0, env);
-}
-
-std::unique_ptr<ZKLogging> _G_zkLogging;
-
-} //anonymous namespace
-
-
-#define STANDARDCATCH(returnStatement) \
- catch (const std::bad_alloc&) { \
- std::cerr<<"Error: Out of memory" <<std::endl; \
- /*might fail, therefore also uses stderror message*/ \
- throwRuntimeException("Out of memory", env); \
- returnStatement; \
- } catch(const ZKException& e) { \
- std::stringstream ss; \
- ss << "In" << __FUNCTION__ << ": "; \
- ss << e.what(); \
- throwRuntimeException(ss.str().c_str(), env); \
- returnStatement; \
- } catch(const std::exception& e) { \
- throwRuntimeException(e.what(), env); \
- returnStatement; \
- }
-
-JNIEXPORT
-void JNICALL
-Java_com_yahoo_vespa_filedistribution_FileDistributionManager_setup(
- JNIEnv *env, jclass self)
-{
- try {
- _G_zkLogging = std::make_unique<ZKLogging>();
- nativeFileDistributionManagerField = LongField<NativeFileDistributionManager*>(self, "nativeFileDistributionManager", env);
- } STANDARDCATCH()
-}
-
-
-namespace {
-void initMockFileDBModel(NativeFileDistributionManager& manager)
-{
- manager._fileDBModel.reset(new MockFileDBModel());
-}
-
-void initFileDBModel(NativeFileDistributionManager& manager, const std::string& zkServers)
-{
- //Ignored for now, since we're not installing any watchers.
- std::shared_ptr<ZKFacade> zk(new ZKFacade(zkServers, true));
- manager._fileDBModel.reset(new ZKFileDBModel(zk));
-}
-} //end anonymous namespace
-
-JNIEXPORT
-void JNICALL
-Java_com_yahoo_vespa_filedistribution_FileDistributionManager_init(
- JNIEnv *env, jobject self,
- jbyteArray fileDBPathArg, jbyteArray zkServersArg)
-{
- try {
- JNIString zkServers(zkServersArg, env);
-
- nativeFileDistributionManagerField.set(self, new NativeFileDistributionManager(), env);
- NativeFileDistributionManager& manager = *nativeFileDistributionManagerField.get(self, env);
- if (zkServers._value == "mockfiledistributionmodel.testing") {
- initMockFileDBModel(manager);
- } else {
- initFileDBModel(manager, zkServers._value);
- }
-
- JNIString fileDBPath(fileDBPathArg, env);
- manager._fileDB.reset(new FileDB(fileDBPath._value));
-
- } STANDARDCATCH()
-}
-
-
-JNIEXPORT
-jstring JNICALL
-Java_com_yahoo_vespa_filedistribution_FileDistributionManager_addFileImpl(
- JNIEnv *env, jobject self,
- jbyteArray completePathArg)
-{
- try {
- JNIString completePath(completePathArg, env);
- CreateTorrent createTorrent(completePath._value);
-
- std::string fileReference = createTorrent.fileReference();
- NativeFileDistributionManager& manager = *nativeFileDistributionManagerField.get(self, env);
-
- DirectoryGuard::UP guard = manager._fileDB->getGuard();// This prevents the filedistributor from working in an inconsistent state.
- bool freshlyAdded = manager._fileDB->add(*guard, completePath._value, fileReference);
-
- FileDBModel& model = *manager._fileDBModel;
- bool hasRegisteredFile = model.hasFile(fileReference);
- if (! hasRegisteredFile ) {
- model.addFile(fileReference, createTorrent.bencode());
- }
- if (freshlyAdded == hasRegisteredFile) {
- std::cerr << "freshlyAdded(" << freshlyAdded << ") == hasRegisteredFile(" << hasRegisteredFile
- << "), which is very odd. File is '" << fileReference << "'" << std::endl;
- }
-
- //contains string with the characters 0-9 a-f
- return env->NewStringUTF(fileReference.c_str());
- } STANDARDCATCH(return 0)
-}
-
-
-JNIEXPORT
-void JNICALL
-Java_com_yahoo_vespa_filedistribution_FileDistributionManager_shutdown(
- JNIEnv *env, jobject self)
-{
- deleteField(nativeFileDistributionManagerField, self, env);
-}
-
-
-JNIEXPORT
-void JNICALL
-Java_com_yahoo_vespa_filedistribution_FileDistributionManager_setDeployedFilesImpl(
- JNIEnv *env, jobject self, jbyteArray hostNameArg,
- jbyteArray appIdArg, jobjectArray fileReferencesArg)
-{
- try {
- JNIString hostName(hostNameArg, env);
- JNIString appId(appIdArg, env);
- JNIArray<JNIString> fileReferences(fileReferencesArg, env);
-
- nativeFileDistributionManagerField.get(self, env)->_fileDBModel->
- setDeployedFilesToDownload(hostName._value, appId._value, fileReferences._value);
- } STANDARDCATCH()
-}
-
-
-JNIEXPORT
-void JNICALL
-Java_com_yahoo_vespa_filedistribution_FileDistributionManager_removeDeploymentsThatHaveDifferentApplicationIdImpl(
- JNIEnv *env, jobject self, jobjectArray hostNamesArg, jbyteArray appIdArg)
-{
- try {
- JNIArray<JNIString> hostNames(hostNamesArg, env);
- JNIString appId(appIdArg, env);
-
- nativeFileDistributionManagerField.get(self, env)->_fileDBModel->
- removeDeploymentsThatHaveDifferentApplicationId(hostNames._value, appId._value);
- } STANDARDCATCH()
-}
-
diff --git a/filedistribution/src/vespa/filedistribution/manager/jnistring.h b/filedistribution/src/vespa/filedistribution/manager/jnistring.h
deleted file mode 100644
index fbdb2e88aee..00000000000
--- a/filedistribution/src/vespa/filedistribution/manager/jnistring.h
+++ /dev/null
@@ -1,78 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#pragma once
-
-#include <exception>
-#include <string>
-#include <vector>
-#include <jni.h>
-
-namespace filedistribution {
-
-class JNIString {
- struct Repr {
- JNIEnv* _env;
- jbyteArray _str;
- jint _size;
- char* _repr;
-
- Repr(jbyteArray str, JNIEnv* env)
- :_env(env),
- _str(str),
- _size(_env->GetArrayLength(str)),
- _repr(reinterpret_cast<char*>(_env->GetPrimitiveArrayCritical(str, 0))) {
-
- if (!_repr)
- throw std::bad_alloc();
- }
-
- ~Repr() {
- _env->ReleasePrimitiveArrayCritical(_str, _repr, 0);
- _env->DeleteLocalRef(_str);
- }
- };
-public:
- typedef jbyteArray JavaValue;
- typedef std::string Value;
- Value _value;
-
- JNIString(jbyteArray str, JNIEnv* env) {
- Repr repr(str, env);
- Value result(repr._repr, repr._repr + repr._size);
- _value.swap(result);
- }
-};
-
-class JNIUtf8String {
-public:
- typedef jstring JavaValue;
- typedef std::string Value;
- std::string _value;
-
- JNIUtf8String(jstring str, JNIEnv* env)
- :_value(env->GetStringUTFLength(str), 0)
- {
- env->GetStringUTFRegion(str, 0, _value.begin() - _value.end(), &*_value.begin());
- env->DeleteLocalRef(str);
- }
-};
-
-template <class JNITYPE>
-class JNIArray {
-public:
- std::vector<typename JNITYPE::Value> _value;
-
- JNIArray(jobjectArray array, JNIEnv* env) {
- jsize length = env->GetArrayLength(array);
- _value.reserve(length);
-
- for (jsize i=0; i<length; ++i) {
- jobject element = env->GetObjectArrayElement(array, i);
- JNITYPE elementValue(static_cast<typename JNITYPE::JavaValue>(element), env);
- _value.push_back(elementValue._value);
- }
- env->DeleteLocalRef(array);
- }
-};
-
-} //namespace filedistribution
-
diff --git a/filedistribution/src/vespa/filedistribution/manager/stderr_logfwd.cpp b/filedistribution/src/vespa/filedistribution/manager/stderr_logfwd.cpp
deleted file mode 100644
index 5273fb562cf..00000000000
--- a/filedistribution/src/vespa/filedistribution/manager/stderr_logfwd.cpp
+++ /dev/null
@@ -1,24 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include <vespa/filedistribution/common/logfwd.h>
-
-#include <iostream>
-#include <vector>
-
-
-void filedistribution::logfwd::log_forward(LogLevel level, const char* file, int line, const char* fmt, ...)
-{
- if (level == debug || level == info)
- return;
-
- const size_t maxSize(0x8000);
- std::vector<char> payload(maxSize);
- char * buf = &payload[0];
-
- va_list args;
- va_start(args, fmt);
- vsnprintf(buf, maxSize, fmt, args);
- va_end(args);
-
- std::cerr <<"Error: " << buf <<" File: " <<file <<" Line: " <<line <<std::endl;
-}
diff --git a/filedistribution/src/vespa/filedistribution/model/CMakeLists.txt b/filedistribution/src/vespa/filedistribution/model/CMakeLists.txt
deleted file mode 100644
index 51af6b106dc..00000000000
--- a/filedistribution/src/vespa/filedistribution/model/CMakeLists.txt
+++ /dev/null
@@ -1,18 +0,0 @@
-# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-vespa_add_library(filedistribution_filedbmodel OBJECT
- SOURCES
- zkfiledbmodel.cpp
- zkfacade.cpp
- deployedfilestodownload.cpp
- DEPENDS
-)
-vespa_add_library(filedistribution_filedistributionmodel STATIC
- SOURCES
- deployedfilestodownload.cpp
- filedistributionmodelimpl.cpp
- zkfacade.cpp
- zkfiledbmodel.cpp
- DEPENDS
-)
-vespa_add_target_external_dependency(filedistribution_filedistributionmodel zookeeper_mt)
-
diff --git a/filedistribution/src/vespa/filedistribution/model/deployedfilestodownload.cpp b/filedistribution/src/vespa/filedistribution/model/deployedfilestodownload.cpp
deleted file mode 100644
index 13daecbe5c1..00000000000
--- a/filedistribution/src/vespa/filedistribution/model/deployedfilestodownload.cpp
+++ /dev/null
@@ -1,149 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "deployedfilestodownload.h"
-#include <vespa/filedistribution/common/logfwd.h>
-
-using filedistribution::DeployedFilesToDownload;
-using filedistribution::Path;
-
-typedef std::vector<std::string> StringVector;
-
-namespace filedistribution {
-
-const Path getApplicationIdPath(const Path & parent) { return parent / "appId"; }
-
-const std::string
-readApplicationId(filedistribution::ZKFacade & zk, const Path & deployNode)
-{
- if (zk.hasNode(getApplicationIdPath(deployNode))) {
- return zk.getString(getApplicationIdPath(deployNode));
- }
- return "default:default:default";
-}
-
-}
-
-const Path
-DeployedFilesToDownload::addNewDeployNode(Path parentPath, const FileReferences& files) {
- Path path = parentPath / "deploy_";
-
- std::ostringstream filesStream;
- if (!files.empty()) {
- filesStream << files[0];
- std::for_each(files.begin() +1, files.end(), [&](const auto & v) { filesStream << '\n' << v; });
- }
- Path retPath = _zk.createSequenceNode(path, filesStream.str().c_str(), filesStream.str().length());
- return retPath;
-}
-
-void
-DeployedFilesToDownload::deleteExpiredDeployNodes(Path parentPath) {
- std::map<std::string, StringVector> childrenPerId(groupChildrenByAppId(parentPath, _zk.getChildren(parentPath)));
- for (auto & kv : childrenPerId) {
- deleteExpiredDeployNodes(parentPath, kv.second);
- }
-}
-
-std::map<std::string, StringVector>
-DeployedFilesToDownload::groupChildrenByAppId(const Path & parentPath, const StringVector & children)
-{
- std::map<std::string, StringVector> childrenById;
- std::for_each(std::begin(children), std::end(children),
- [&](const std::string & childName)
- {
- Path childPath = parentPath / childName;
- std::string appId(readApplicationId(_zk, childPath));
- childrenById[appId].push_back(childName);
- });
- return childrenById;
-}
-
-void
-DeployedFilesToDownload::deleteExpiredDeployNodes(Path parentPath, StringVector children)
-{
- if (children.size() > numberOfDeploymentsToKeepFilesFrom) {
- std::sort(children.begin(), children.end());
-
- size_t numberOfNodesToDelete = children.size() - numberOfDeploymentsToKeepFilesFrom;
- std::for_each(children.begin(), children.begin() + numberOfNodesToDelete,
- [&](const std::string & s) {_zk.remove(parentPath / s); });
- }
-}
-
-void
-DeployedFilesToDownload::addAppIdToDeployNode(const Path & deployNode, const std::string & appId)
-{
- _zk.setData(getApplicationIdPath(deployNode), appId.c_str(), appId.length());
-}
-
-void
-DeployedFilesToDownload::setDeployedFilesToDownload(
- const std::string& hostName,
- const std::string& applicationId,
- const FileReferences& files) {
- Path parentPath = getPath(hostName);
- _zk.setData(parentPath, "", 0);
-
- const Path deployNode(addNewDeployNode(parentPath, files));
- addAppIdToDeployNode(deployNode, applicationId);
- deleteExpiredDeployNodes(parentPath);
-}
-
-//Nothrow
-template <typename INSERT_ITERATOR>
-void
-DeployedFilesToDownload::readDeployFile(const Path& path, INSERT_ITERATOR insertionIterator) {
- LOGFWD(debug, "Reading deploy file '%s", path.string().c_str());
-
-
- try {
- Buffer buffer(_zk.getData(path));
- std::string stringBuffer(buffer.begin(), buffer.end());
- std::istringstream stream(stringBuffer);
-
- typedef std::istream_iterator<std::string> iterator;
- std::copy(iterator(stream), iterator(), insertionIterator);
- } catch (const ZKNodeDoesNotExistsException& e) {
- //Node deleted, no problem.
- LOGFWD(debug, "Deploy file '%s' deleted.", path.string().c_str());
- }
-}
-
-const DeployedFilesToDownload::FileReferences
-DeployedFilesToDownload::getDeployedFilesToDownload(
- const std::string& hostName,
- const ZKFacade::NodeChangedWatcherSP& watcher) {
-
- try {
- StringVector deployedFiles = _zk.getChildren(getPath(hostName), watcher);
- FileReferences fileReferences;
-
- for (StringVector::iterator i = deployedFiles.begin(); i != deployedFiles.end(); ++i) {
- readDeployFile(getPath(hostName) / *i, std::back_inserter(fileReferences));
- }
-
- return fileReferences;
- } catch (const ZKNodeDoesNotExistsException&) {
- //Add watch waiting for the node to appear:
- if (_zk.hasNode(getPath(hostName), watcher)) {
- return getDeployedFilesToDownload(hostName, watcher);
- } else {
- return FileReferences();
- }
- }
-}
-
-const DeployedFilesToDownload::FileReferences
-DeployedFilesToDownload::getLatestDeployedFilesToDownload(const std::string& hostName)
-{
- StringVector deployedFiles = _zk.getChildren(getPath(hostName));
- std::sort(deployedFiles.begin(), deployedFiles.end());
-
- FileReferences fileReferences;
- if (deployedFiles.empty()) {
- return fileReferences;
- } else {
- readDeployFile(getPath(hostName) / *deployedFiles.rbegin(), std::back_inserter(fileReferences));
- return fileReferences;
- }
-}
diff --git a/filedistribution/src/vespa/filedistribution/model/deployedfilestodownload.h b/filedistribution/src/vespa/filedistribution/model/deployedfilestodownload.h
deleted file mode 100644
index fc726170084..00000000000
--- a/filedistribution/src/vespa/filedistribution/model/deployedfilestodownload.h
+++ /dev/null
@@ -1,54 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#pragma once
-
-#include "zkfacade.h"
-#include "zkfiledbmodel.h"
-
-namespace filedistribution {
-
-const std::string readApplicationId(ZKFacade & zk, const Path & deployNode);
-
-class DeployedFilesToDownload {
- //includes the current deployment. Want 2 * number of config models, since deployment is per model
- static const size_t numberOfDeploymentsToKeepFilesFrom = 14;
-
- ZKFacade& _zk;
-
- Path getPath(const std::string& hostName) {
- return ZKFileDBModel::_hostsPath / hostName;
- }
-
- //Nothrow
- template <typename INSERT_ITERATOR>
- void readDeployFile(const Path& path, INSERT_ITERATOR insertionIterator);
- void addAppIdToDeployNode(const Path & deployNode, const std::string & appId);
- std::map<std::string, std::vector<std::string> > groupChildrenByAppId(const Path & parentPath, const std::vector<std::string> & children);
- void deleteExpiredDeployNodes(Path parentPath, std::vector<std::string> children);
-
-public:
- typedef std::vector<std::string> FileReferences;
-
- DeployedFilesToDownload(ZKFacade* zk)
- :_zk(*zk)
- {}
-
- const Path addNewDeployNode(Path parentPath, const FileReferences& files);
-
- void deleteExpiredDeployNodes(Path parentPath);
-
- void setDeployedFilesToDownload(
- const std::string& hostName,
- const std::string& applicationId,
- const FileReferences& files);
-
- /** For all the deploys available **/
- const FileReferences getDeployedFilesToDownload(
- const std::string& hostName,
- const ZKFacade::NodeChangedWatcherSP& watcher);
-
- /** For the current deploy only **/
- const FileReferences getLatestDeployedFilesToDownload(const std::string& hostName);
-};
-
-} //namespace filedistribution
-
diff --git a/filedistribution/src/vespa/filedistribution/model/filedbmodel.h b/filedistribution/src/vespa/filedistribution/model/filedbmodel.h
deleted file mode 100644
index c556c703b6d..00000000000
--- a/filedistribution/src/vespa/filedistribution/model/filedbmodel.h
+++ /dev/null
@@ -1,60 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#pragma once
-
-#include <vespa/filedistribution/common/buffer.h>
-#include <vespa/filedistribution/common/exception.h>
-
-namespace filedistribution {
-
-class DirectoryGuard {
-public:
- typedef std::unique_ptr<DirectoryGuard> UP;
- DirectoryGuard(Path path);
- ~DirectoryGuard();
-private:
- int _fd;
-};
-
-VESPA_DEFINE_EXCEPTION(InvalidProgressException, vespalib::Exception);
-VESPA_DEFINE_EXCEPTION(InvalidHostStatusException, vespalib::Exception);
-
-class FileDBModel {
-public:
- struct HostStatus {
- enum State { finished, inProgress, notStarted };
-
- State _state;
- size_t _numFilesToDownload;
- size_t _numFilesFinished;
- };
-
- FileDBModel(const FileDBModel &) = delete;
- FileDBModel & operator = (const FileDBModel &) = delete;
- FileDBModel() = default;
- virtual ~FileDBModel();
-
- virtual bool hasFile(const std::string& fileReference) = 0;
- virtual void addFile(const std::string& fileReference, const Buffer& buffer) = 0;
- virtual Buffer getFile(const std::string& fileReference) = 0;
- virtual void cleanFiles(const std::vector<std::string>& filesToPreserve) = 0;
-
- virtual void setDeployedFilesToDownload(const std::string& hostName,
- const std::string & appId,
- const std::vector<std::string> & files) = 0;
- virtual void cleanDeployedFilesToDownload(
- const std::vector<std::string> & hostsToPreserve,
- const std::string& appId) = 0;
- virtual void removeDeploymentsThatHaveDifferentApplicationId(
- const std::vector<std::string> & hostsToPreserve,
- const std::string& appId) = 0;
- virtual std::vector<std::string> getHosts() = 0;
-
- virtual HostStatus getHostStatus(const std::string& hostName) = 0;
- //TODO: does not really belong here, refactor.
- typedef std::vector<int8_t> Progress; // [0-100]
- virtual Progress getProgress(const std::string& fileReference,
- const std::vector<std::string>& hostsSortedAscending) = 0;
-};
-
-} //namespace filedistribution
-
diff --git a/filedistribution/src/vespa/filedistribution/model/filedistributionmodel.h b/filedistribution/src/vespa/filedistribution/model/filedistributionmodel.h
deleted file mode 100644
index 1b0ed9f4826..00000000000
--- a/filedistribution/src/vespa/filedistribution/model/filedistributionmodel.h
+++ /dev/null
@@ -1,44 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#pragma once
-
-#include <vector>
-#include <memory>
-#include <string>
-#include <set>
-
-#include <boost/signals2.hpp>
-
-#include <libtorrent/peer.hpp>
-#include <vespa/filedistribution/common/buffer.h>
-#include <vespa/filedistribution/common/exception.h>
-#include "filedbmodel.h"
-
-namespace filedistribution {
-
-VESPA_DEFINE_EXCEPTION(NotPeer, vespalib::Exception);
-
-class FileDistributionModel {
-public:
- typedef boost::signals2::signal<void ()> FilesToDownloadChangedSignal;
- typedef std::vector<libtorrent::peer_entry> PeerEntries;
-
- virtual FileDBModel& getFileDBModel() = 0;
-
- virtual std::set<std::string> getFilesToDownload() = 0;
-
- virtual PeerEntries getPeers(const std::string& fileReference, size_t maxPeers) = 0;
- virtual void addPeer(const std::string& fileReference) = 0;
- virtual void removePeer(const std::string& fileReference) = 0;
- virtual void peerFinished(const std::string& fileReference) = 0; //throws NotPeer
-
- FileDistributionModel(const FileDistributionModel &) = delete;
- FileDistributionModel & operator = (const FileDistributionModel &) = delete;
- FileDistributionModel() = default;
- virtual ~FileDistributionModel() {}
-
- FilesToDownloadChangedSignal _filesToDownloadChanged;
-};
-
-} //namespace filedistribution
-
-
diff --git a/filedistribution/src/vespa/filedistribution/model/filedistributionmodelimpl.cpp b/filedistribution/src/vespa/filedistribution/model/filedistributionmodelimpl.cpp
deleted file mode 100644
index e0338cffd52..00000000000
--- a/filedistribution/src/vespa/filedistribution/model/filedistributionmodelimpl.cpp
+++ /dev/null
@@ -1,231 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "filedistributionmodel.h"
-#include "zkfiledbmodel.h"
-#include "deployedfilestodownload.h"
-#include "filedistributionmodelimpl.h"
-#include <boost/filesystem.hpp>
-#include <zookeeper/zookeeper.h>
-
-#include <vespa/log/log.h>
-LOG_SETUP(".filedistributionmodel");
-
-namespace fs = boost::filesystem;
-
-using filedistribution::ZKFileDBModel;
-using std::make_shared;
-
-namespace {
-//peer format: hostName:port
-
-void
-addPeerEntry(const std::string& peer,
- filedistribution::FileDistributionModelImpl::PeerEntries& result) {
-
- try {
- libtorrent::peer_entry peerEntry;
- peerEntry.pid.clear();
-
- std::istringstream stream(peer);
- stream.exceptions ( std::istream::failbit | std::istream::badbit );
-
- std::getline(stream, peerEntry.ip, ZKFileDBModel::_peerEntrySeparator);
- stream >> peerEntry.port;
-
- result.push_back(peerEntry);
- } catch (const std::exception&) {
- LOG(warning, "Invalid peer entry: '%s'", peer.c_str());
- //Ignore invalid peer entries
- }
-}
-
-std::vector<std::string>::iterator
-prunePeers(std::vector<std::string> &peers, size_t maxPeers) {
- if (peers.size() <= maxPeers)
- return peers.end();
-
- assert (maxPeers < 2147483648); //due to the usage of rand()
-
- const size_t peersSize = peers.size();
- for (size_t i=0; i<maxPeers; ++i) {
- //i <= i + (std::rand() % ( peersSize -i )) <= peerSize - 1
- size_t candidateBetween_i_and_peerSize = i + ( std::rand() % ( peersSize -i ));
- std::swap(peers[i], peers[candidateBetween_i_and_peerSize]);
- }
-
- return peers.begin() + maxPeers;
-}
-
-} //anonymous namespace
-
-namespace filedistribution {
-
-VESPA_IMPLEMENT_EXCEPTION(NotPeer, vespalib::Exception);
-
-}
-
-using filedistribution::FileDistributionModelImpl;
-
-struct FileDistributionModelImpl::DeployedFilesChangedCallback :
- public ZKFacade::NodeChangedWatcher
-{
- typedef std::shared_ptr<DeployedFilesChangedCallback> SP;
-
- std::weak_ptr<FileDistributionModelImpl> _parent;
-
- DeployedFilesChangedCallback(const std::shared_ptr<FileDistributionModelImpl> & parent)
- :_parent(parent)
- {}
-
- //override
- void operator()() override {
- if (std::shared_ptr<FileDistributionModelImpl> model = _parent.lock()) {
- model->_filesToDownloadChanged();
- }
- }
-};
-
-
-FileDistributionModelImpl::~FileDistributionModelImpl() {
- LOG(debug, "Deconstructing FileDistributionModelImpl");
-}
-
-FileDistributionModelImpl::PeerEntries
-FileDistributionModelImpl::getPeers(const std::string& fileReference, size_t maxPeers) {
- try {
- fs::path path = _fileDBModel.getPeersPath(fileReference);
-
- typedef std::vector<std::string> Peers;
- Peers peers = _zk->getChildren(path);
- // TODO: Take this port from some config somewhere instead of hardcoding
- addConfigServersAsPeers(peers, getenv("services__addr_configserver"), 19093);
-
- Peers::iterator end = prunePeers(peers, maxPeers);
-
- PeerEntries result;
- result.reserve(end - peers.begin());
-
- std::for_each(peers.begin(), end, [&] (const std::string & s) { addPeerEntry(s, result); });
-
- LOG(debug, "Found %zu peers for path '%s'", result.size(), path.string().c_str());
- return result;
- } catch(ZKNodeDoesNotExistsException&) {
- LOG(debug, "No peer entries available for '%s'", fileReference.c_str());
- return PeerEntries();
- }
-}
-
-fs::path
-FileDistributionModelImpl::getPeerEntryPath(const std::string& fileReference) {
- std::ostringstream entry;
- entry <<_hostName << ZKFileDBModel::_peerEntrySeparator <<_port;
-
- return _fileDBModel.getPeersPath(fileReference) / entry.str();
-}
-
-void
-FileDistributionModelImpl::addPeer(const std::string& fileReference) {
- fs::path path = getPeerEntryPath(fileReference);
- LOG(debug, "Adding peer '%s'", path.string().c_str());
-
- if (_zk->hasNode(path)) {
- LOG(info, "Retiring previous peer node owner.");
- _zk->removeIfExists(path);
- }
- _zk->addEphemeralNode(path);
-}
-
-void
-FileDistributionModelImpl::removePeer(const std::string& fileReference) {
- fs::path path = getPeerEntryPath(fileReference);
- LOG(debug, "Removing peer '%s'", path.string().c_str());
-
- _zk->removeIfExists(path);
-}
-
-//Assumes that addPeer has been called before the torrent was started,
-//so that we avoid the race condition between finishing downloading a torrent
-//and setting peer status
-void
-FileDistributionModelImpl::peerFinished(const std::string& fileReference) {
- fs::path path = getPeerEntryPath(fileReference);
- LOG(debug, "Peer finished '%s'", path.string().c_str());
-
- try {
- bool mustExist = true;
- char progress = 100; //percent
-
- _zk->setData(path, &progress, sizeof(char), mustExist);
- } catch(ZKNodeDoesNotExistsException & e) {
- NotPeer(fileReference, e, VESPA_STRLOC);
- }
-}
-
-std::set<std::string>
-FileDistributionModelImpl::getFilesToDownload() {
- DeployedFilesToDownload d(_zk.get());
- std::vector<std::string> deployed = d.getDeployedFilesToDownload(_hostName,
- make_shared<DeployedFilesChangedCallback>(shared_from_this()));
-
- std::set<std::string> result(deployed.begin(), deployed.end());
-
- {
- LockGuard guard(_activeFileReferencesMutex);
- result.insert(_activeFileReferences.begin(), _activeFileReferences.end());
- }
- return result;
-}
-
-bool
-FileDistributionModelImpl::updateActiveFileReferences(
- const std::vector<vespalib::string>& fileReferences) {
-
- std::vector<vespalib::string> sortedFileReferences(fileReferences);
- std::sort(sortedFileReferences.begin(), sortedFileReferences.end());
-
- LockGuard guard(_activeFileReferencesMutex);
- bool changed = sortedFileReferences != _activeFileReferences;
-
- sortedFileReferences.swap(_activeFileReferences);
- return changed;
-}
-
-void
-FileDistributionModelImpl::addConfigServersAsPeers(
- std::vector<std::string> & peers, char const* envConfigServers, int port) {
-
- std::set<std::string> peersFromTracker(peers.begin(), peers.end());
-
- if (envConfigServers == NULL) {
- // Could be standalone cluster (not set for this).
- return;
- }
- std::string configServerCommaListed(envConfigServers);
- std::stringstream configserverstream(configServerCommaListed);
- std::string configserver;
- while (std::getline(configserverstream, configserver, ',')) {
- configserver += ":" + std::to_string(port);
- if (peersFromTracker.find(configserver) == peersFromTracker.end()) {
- peers.push_back(configserver);
- LOG(debug, "Adding configserver '%s'", configserver.c_str());
- } else {
- LOG(debug, "Configserver already added '%s'", configserver.c_str());
- }
- }
-}
-
-void
-FileDistributionModelImpl::configure(std::unique_ptr<FilereferencesConfig> config) {
- const bool changed = updateActiveFileReferences(config->filereferences);
- if (changed) {
- try {
- _filesToDownloadChanged();
- } catch (const ZKConnectionLossException & e) {
- LOG(info, "Connection loss in reconfigure of file references, resuming. %s", e.what());
- } catch (const ZKOperationTimeoutException & e) {
- LOG(warning, "Operation timed out in reconfigure of file references. "
- "Will do quick exit to start a clean sheet. %s", e.what());
- std::quick_exit(41);
- }
- }
-}
diff --git a/filedistribution/src/vespa/filedistribution/model/filedistributionmodelimpl.h b/filedistribution/src/vespa/filedistribution/model/filedistributionmodelimpl.h
deleted file mode 100644
index 5374040a7f1..00000000000
--- a/filedistribution/src/vespa/filedistribution/model/filedistributionmodelimpl.h
+++ /dev/null
@@ -1,68 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#pragma once
-
-#include "filedistributionmodel.h"
-#include <vespa/config-filereferences.h>
-#include "zkfacade.h"
-#include "zkfiledbmodel.h"
-#include <vespa/config/config.h>
-
-using cloud::config::filedistribution::FilereferencesConfig;
-
-namespace filedistribution {
-
-class FileDistributionModelImpl : public FileDistributionModel,
- public config::IFetcherCallback<FilereferencesConfig>,
- public std::enable_shared_from_this<FileDistributionModelImpl>
-{
- struct DeployedFilesChangedCallback;
-
- const std::string _hostName;
- const int _port;
-
- const std::shared_ptr<ZKFacade> _zk;
- ZKFileDBModel _fileDBModel;
-
- std::mutex _activeFileReferencesMutex;
- typedef std::lock_guard<std::mutex> LockGuard;
- std::vector<vespalib::string> _activeFileReferences;
-
- bool /*changed*/
- updateActiveFileReferences(const std::vector<vespalib::string>& fileReferences);
-
- Path getPeerEntryPath(const std::string& fileReference);
-public:
- FileDistributionModelImpl(const std::string& hostName, int port, const std::shared_ptr<ZKFacade>& zk)
- :_hostName(hostName),
- _port(port),
- _zk(zk),
- _fileDBModel(_zk)
- {
- /* Hack: Force the first call to updateActiveFileReferences to return changed=true
- when the file references config is empty.
- This ensures that the "deployed files to download" nodes in zookeeper are read at start up.
- */
- _activeFileReferences.push_back("force-initial-files-to-download-changed-signal");
- }
-
- ~FileDistributionModelImpl();
-
- //overrides FileDistributionModel
- FileDBModel& getFileDBModel() override {
- return _fileDBModel;
- }
-
- std::set<std::string> getFilesToDownload() override;
-
- PeerEntries getPeers(const std::string& fileReference, size_t maxPeers) override;
- void addPeer(const std::string& fileReference) override;
- void removePeer(const std::string& fileReference) override;
- void peerFinished(const std::string& fileReference) override;
- void addConfigServersAsPeers(std::vector<std::string>& peers, char const* envConfigServer, int port);
-
- //Overrides Subscriber
- void configure(std::unique_ptr<FilereferencesConfig> config) override;
-};
-
-} //namespace filedistribution
-
diff --git a/filedistribution/src/vespa/filedistribution/model/mockfiledistributionmodel.h b/filedistribution/src/vespa/filedistribution/model/mockfiledistributionmodel.h
deleted file mode 100644
index 70f9472caab..00000000000
--- a/filedistribution/src/vespa/filedistribution/model/mockfiledistributionmodel.h
+++ /dev/null
@@ -1,53 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#pragma once
-
-#include "filedistributionmodel.h"
-#include <algorithm>
-#include <vector>
-
-namespace filedistribution {
-
-class MockFileDBModel : public FileDBModel {
- std::vector<std::string> _fileReferences;
-public:
- bool hasFile(const std::string& fileReference) override {
- return std::find(_fileReferences.begin(), _fileReferences.end(), fileReference) != _fileReferences.end();
- }
-
- void addFile(const std::string& fileReference, const Buffer & buffer) override {
- (void)buffer;
- _fileReferences.push_back(fileReference);
- }
-
- Buffer getFile(const std::string& fileReference) override {
- (void)fileReference;
- const char* resultStr = "result";
- Buffer result(resultStr, resultStr + strlen(resultStr));
- return result;
- }
-
- void cleanFiles(const std::vector<std::string> &) override {}
-
- void setDeployedFilesToDownload(const std::string&, const std::string&,
- const std::vector<std::string> &) override {}
- void cleanDeployedFilesToDownload(const std::vector<std::string> &,
- const std::string&) override {}
- void removeDeploymentsThatHaveDifferentApplicationId(const std::vector<std::string> &,
- const std::string&) override {}
-
- std::vector<std::string> getHosts() override {
- return std::vector<std::string>();
- }
-
- HostStatus getHostStatus(const std::string&) override {
- return HostStatus();
- }
-
- Progress getProgress(const std::string&, const std::vector<std::string>&) override {
- return Progress();
- }
-};
-
-
-} //namespace filedistribution
-
diff --git a/filedistribution/src/vespa/filedistribution/model/zkfacade.cpp b/filedistribution/src/vespa/filedistribution/model/zkfacade.cpp
deleted file mode 100644
index de410289ec0..00000000000
--- a/filedistribution/src/vespa/filedistribution/model/zkfacade.cpp
+++ /dev/null
@@ -1,605 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "zkfacade.h"
-#include <vespa/vespalib/net/socket_address.h>
-#include <vespa/filedistribution/common/logfwd.h>
-#include <vespa/defaults.h>
-#include <vespa/vespalib/util/gate.h>
-#include <vespa/vespalib/text/stringtokenizer.h>
-#include <zookeeper/zookeeper.h>
-#include <sstream>
-#include <thread>
-#include <boost/function_output_iterator.hpp>
-
-typedef std::unique_lock<std::mutex> UniqueLock;
-
-using filedistribution::ZKFacade;
-using filedistribution::Buffer;
-using filedistribution::ZKGenericException;
-using filedistribution::ZKException;
-using filedistribution::ZKLogging;
-using filedistribution::Path;
-
-namespace {
-
-std::string
-toErrorMsg(int zkStatus) {
- switch(zkStatus) {
- //System errors
- case ZRUNTIMEINCONSISTENCY:
- return "Zookeeper: A runtime inconsistency was found(ZRUNTIMEINCONSISTENCY)";
- case ZDATAINCONSISTENCY:
- return "Zookeeper: A data inconsistency was found(ZDATAINCONSISTENCY)";
- case ZCONNECTIONLOSS:
- return "Zookeeper: Connection to the server has been lost(ZCONNECTIONLOSS)";
- case ZMARSHALLINGERROR:
- return "Zookeeper: Error while marshalling or unmarshalling data(ZMARSHALLINGERROR)";
- case ZUNIMPLEMENTED:
- return "Zookeeper: Operation is unimplemented(ZUNIMPLEMENTED)";
- case ZOPERATIONTIMEOUT:
- return "Zookeeper: Operation timeout(ZOPERATIONTIMEOUT)";
- case ZBADARGUMENTS:
- return "Zookeeper: Invalid arguments(ZBADARGUMENTS)";
- case ZINVALIDSTATE:
- return "Zookeeper: The connection with the zookeeper servers timed out(ZINVALIDSTATE).";
-
- //API errors
- case ZNONODE:
- return "Zookeeper: Node does not exist(ZNONODE)";
- case ZNOAUTH:
- return "Zookeeper: Not authenticated(ZNOAUTH)";
- case ZBADVERSION:
- return "Zookeeper: Version conflict(ZBADVERSION)";
- case ZNOCHILDRENFOREPHEMERALS:
- return "Zookeeper: Ephemeral nodes may not have children(ZNOCHILDRENFOREPHEMERALS)";
- case ZNODEEXISTS:
- return "Zookeeper: The node already exists(ZNODEEXISTS)";
- case ZNOTEMPTY:
- return "Zookeeper: The node has children(ZNOTEMPTY)";
- case ZSESSIONEXPIRED:
- return "Zookeeper: The session has been expired by the server(ZSESSIONEXPIRED)";
- case ZINVALIDCALLBACK:
- return "Zookeeper: Invalid callback specified(ZINVALIDCALLBACK)";
- case ZINVALIDACL:
- return "Zookeeper: Invalid ACL specified(ZINVALIDACL)";
- case ZAUTHFAILED:
- return "Zookeeper: Client authentication failed(ZAUTHFAILED)";
- case ZCLOSING:
- return "Zookeeper: ZooKeeper is closing(ZCLOSING)";
- case ZNOTHING:
- return "Zookeeper: No server responses to process(ZNOTHING)";
- default:
- LOGFWD(error, "In ZKGenericException::what(): Invalid error code %d", zkStatus);
- return "Zookeeper: Invalid error code.";
- }
-}
-
-class RetryController {
- unsigned int _retryCount;
- ZKFacade& _zkFacade;
-
- static const unsigned int _maxRetries = 10;
-public:
- int _lastStatus;
-
- RetryController(ZKFacade* zkFacade)
- :_retryCount(0),
- _zkFacade(*zkFacade),
- _lastStatus(0)
- {}
-
- void operator()(int status) {
- _lastStatus = status;
- }
-
- bool shouldRetry() {
- ++_retryCount;
-
- return _zkFacade.retriesEnabled() &&
- _lastStatus != ZOK &&
- _retryCount < _maxRetries &&
- isNonLastingError(_lastStatus) &&
- pause();
- }
-
- bool isNonLastingError(int error) {
- return error == ZCONNECTIONLOSS ||
- error == ZOPERATIONTIMEOUT;
- }
-
- bool pause() {
- unsigned int sleepInSeconds = 1;
- sleep(sleepInSeconds);
- LOGFWD(info, "Retrying zookeeper operation.");
- return true;
- }
-
- void throwIfError(const Path & path) {
- namespace fd = filedistribution;
-
- switch (_lastStatus) {
- case ZSESSIONEXPIRED:
- throw fd::ZKSessionExpired(path.string(), VESPA_STRLOC);
- case ZNONODE:
- throw fd::ZKNodeDoesNotExistsException(path.string(), VESPA_STRLOC);
- case ZNODEEXISTS:
- throw fd::ZKNodeExistsException(path.string(), VESPA_STRLOC);
- case ZCONNECTIONLOSS:
- throw fd::ZKConnectionLossException(path.string(), VESPA_STRLOC);
- case ZOPERATIONTIMEOUT:
- throw fd::ZKOperationTimeoutException(path.string(), VESPA_STRLOC);
- default:
- if (_lastStatus != ZOK) {
- throw fd::ZKGenericException(_lastStatus, toErrorMsg(_lastStatus) + " : " + path.string(), VESPA_STRLOC);
- }
- }
- }
-};
-
-class DeallocateZKStringVectorGuard {
- String_vector& _strings;
-public:
- DeallocateZKStringVectorGuard(String_vector& strings)
- :_strings(strings)
- {}
-
- ~DeallocateZKStringVectorGuard() {
- deallocate_String_vector(&_strings);
- }
-};
-
-const Path
-setDataForNewFile(ZKFacade& zk, const Path& path, const char* buffer, int length, zhandle_t* zhandle, int createFlags) {
-
- RetryController retryController(&zk);
- const int maxPath = 1024;
- char createdPath[maxPath];
- do {
- retryController( zoo_create(zhandle, path.string().c_str(), buffer, length, &ZOO_OPEN_ACL_UNSAFE, createFlags, createdPath, maxPath));
- } while (retryController.shouldRetry());
- Path newPath(createdPath);
- retryController.throwIfError(newPath);
- return newPath;
-}
-
-void
-setDataForExistingFile(ZKFacade& zk, const Path& path, const char* buffer, int length, zhandle_t* zhandle) {
- RetryController retryController(&zk);
-
- const int ignoreVersion = -1;
- do {
- retryController(zoo_set(zhandle, path.string().c_str(), buffer, length, ignoreVersion));
- } while (retryController.shouldRetry());
-
- retryController.throwIfError(path);
-}
-
-} //anonymous namespace
-
-namespace filedistribution {
-
-VESPA_IMPLEMENT_EXCEPTION(ZKNodeDoesNotExistsException, ZKException);
-VESPA_IMPLEMENT_EXCEPTION(ZKConnectionLossException, ZKException);
-VESPA_IMPLEMENT_EXCEPTION(ZKNodeExistsException, ZKException);
-VESPA_IMPLEMENT_EXCEPTION(ZKFailedConnecting, ZKException);
-VESPA_IMPLEMENT_EXCEPTION(ZKSessionExpired, ZKException);
-VESPA_IMPLEMENT_EXCEPTION(ZKOperationTimeoutException, ZKException);
-VESPA_IMPLEMENT_EXCEPTION_SPINE(ZKGenericException);
-
-}
-
-/********** Active watchers *******************************************/
-struct ZKFacade::ZKWatcher {
- const std::weak_ptr<ZKFacade> _owner;
- const NodeChangedWatcherSP _nodeChangedWatcher;
-
- ZKWatcher(
- const std::shared_ptr<ZKFacade> &owner,
- const NodeChangedWatcherSP& nodeChangedWatcher )
- :_owner(owner),
- _nodeChangedWatcher(nodeChangedWatcher)
- {}
-
- static void watcherFn(zhandle_t *zh, int type,
- int state, const char *path,void *watcherContext) {
-
- (void)zh;
- (void)state;
- (void)path;
-
- if (type == ZOO_SESSION_EVENT) {
- //The session events do not cause unregistering of the watcher
- //inside zookeeper, so don't unregister it here.
- LOGFWD(debug, "ZKWatcher recieved session event with state '%d'. Ignoring", state);
- return;
- }
-
- LOGFWD(debug, "ZKWatcher: Begin watcher called for path '%s' with type %d.", path, type);
-
- ZKWatcher* self = static_cast<ZKWatcher*>(watcherContext);
-
- //WARNING: Since we're creating a shared_ptr to ZKFacade here, this might cause
- //destruction of the ZKFacade in a zookeeper thread.
- //Since zookeeper_close blocks until all watcher threads are finished, and we're inside a watcher thread,
- //this will cause infinite waiting.
- //To avoid this, a custom shared_ptr deleter using a separate deleter thread must be used.
-
- if (std::shared_ptr<ZKFacade> zk = self->_owner.lock()) {
- zk->invokeWatcher(watcherContext);
- }
-
- LOGFWD(debug, "ZKWatcher: End watcher called for path '%s' with type %d.", path, type);
- }
-};
-
-void
-ZKFacade::stateWatchingFun(zhandle_t*, int type, int state, const char* path, void* context) {
- (void)context;
-
- //The ZKFacade won't expire before zookeeper_close has finished.
- try {
- if (type == ZOO_SESSION_EVENT) {
- LOGFWD(debug, "Zookeeper session event: %d", state);
- if (state == ZOO_EXPIRED_SESSION_STATE) {
- throw ZKSessionExpired(path, VESPA_STRLOC);
- } else if (state == ZOO_AUTH_FAILED_STATE) {
- throw ZKGenericException(ZNOAUTH, path, VESPA_STRLOC);
- }
- } else {
- LOGFWD(info, "State watching function: Unexpected event: '%d' -- '%d' ", type, state);
- }
- } catch (ZKSessionExpired & e) {
- LOGFWD(error, "Received ZKSessionExpired exception that I can not handle. Will just exit quietly : %s", e.what());
- std::quick_exit(11);
- }
-}
-
-
-void* /* watcherContext */
-ZKFacade::registerWatcher(const NodeChangedWatcherSP& watcher) {
-
- UniqueLock lock(_watchersMutex);
- std::shared_ptr<ZKWatcher> zkWatcher(new ZKWatcher(shared_from_this(), watcher));
- _watchers[zkWatcher.get()] = zkWatcher;
- return zkWatcher.get();
-}
-
-
-std::shared_ptr<ZKFacade::ZKWatcher>
-ZKFacade::unregisterWatcher(void* watcherContext) {
- UniqueLock lock(_watchersMutex);
-
- WatchersMap::iterator i = _watchers.find(watcherContext);
- if (i == _watchers.end()) {
- return std::shared_ptr<ZKWatcher>();
- } else {
- std::shared_ptr<ZKWatcher> result = i->second;
- _watchers.erase(i);
- return result;
- }
-}
-
-void
-ZKFacade::invokeWatcher(void* watcherContext) {
- std::shared_ptr<ZKWatcher> watcher = unregisterWatcher(watcherContext);
-
- if (!_watchersEnabled)
- return;
-
- if (watcher) {
- try {
- (*watcher->_nodeChangedWatcher)();
- } catch (const ZKConnectionLossException & e) {
- LOGFWD(error, "Got connection loss exception while invoking watcher : %s", e.what());
- std::quick_exit(12);
- }
- } else {
- LOGFWD(error, "Invoke called on expired watcher.");
- }
-}
-
-/********** End live watchers ***************************************/
-
-std::string
-ZKFacade::getValidZKServers(const std::string &input, bool ignoreDNSFailure) {
- if (ignoreDNSFailure) {
- vespalib::StringTokenizer tokenizer(input, ",");
- vespalib::string validServers;
- for (vespalib::string spec : tokenizer) {
- vespalib::StringTokenizer addrTokenizer(spec, ":");
- vespalib::string address = addrTokenizer[0];
- vespalib::string port = addrTokenizer[1];
- if ( !vespalib::SocketAddress::resolve(atoi(port.c_str()), address.c_str()).empty()) {
- if ( !validServers.empty() ) {
- validServers += ',';
- }
- validServers += spec;
- }
- }
- return validServers;
- }
- return input;
-}
-
-
-ZKFacade::ZKFacade(const std::string& zkservers, bool allowDNSFailure)
- :_retriesEnabled(true),
- _watchersEnabled(true),
- _zhandle(zookeeper_init(getValidZKServers(zkservers, allowDNSFailure).c_str(),
- &ZKFacade::stateWatchingFun,
- _zkSessionTimeOut,
- 0, //clientid,
- this, //context,
- 0)) //flags
-{
- if (!_zhandle) {
- throw ZKFailedConnecting("No zhandle", VESPA_STRLOC);
- }
-}
-
-ZKFacade::~ZKFacade() {
- disableRetries();
- _watchersEnabled = false;
- vespalib::Gate done;
- std::thread closer([&done, zhandle=_zhandle] () { zookeeper_close(zhandle); done.countDown(); });
- if ( done.await(50*1000) ) {
- LOGFWD(debug, "Zookeeper connection closed successfully.");
- } else {
- LOGFWD(error, "Not able to close down zookeeper. Dumping core so you can figure out what is wrong");
- abort();
- }
- closer.join();
-}
-
-const std::string
-ZKFacade::getString(const Path& path) {
- Buffer buffer(getData(path));
- return std::string(buffer.begin(), buffer.end());
-}
-
-Buffer
-ZKFacade::getData(const Path& path) {
- RetryController retryController(this);
- Buffer buffer(_maxDataSize);
- int bufferSize = _maxDataSize;
-
- const int watchIsOff = 0;
- do {
- Stat stat;
- bufferSize = _maxDataSize;
-
- retryController( zoo_get(_zhandle, path.string().c_str(), watchIsOff, &*buffer.begin(), &bufferSize, &stat));
- } while(retryController.shouldRetry());
-
- retryController.throwIfError(path);
- buffer.resize(bufferSize);
- return buffer;
-}
-
-Buffer
-ZKFacade::getData(const Path& path, const NodeChangedWatcherSP& watcher) {
- RegistrationGuard unregisterGuard(*this, watcher);
- void* watcherContext = unregisterGuard.get();
- RetryController retryController(this);
-
- Buffer buffer(_maxDataSize);
- int bufferSize = _maxDataSize;
-
- do {
- Stat stat;
- bufferSize = _maxDataSize;
-
- retryController( zoo_wget(_zhandle, path.string().c_str(), &ZKWatcher::watcherFn, watcherContext, &*buffer.begin(), &bufferSize, &stat));
- } while (retryController.shouldRetry());
-
- retryController.throwIfError(path);
-
- buffer.resize(bufferSize);
- unregisterGuard.release();
- return buffer;
-}
-
-void
-ZKFacade::setData(const Path& path, const Buffer& buffer, bool mustExist) {
- return setData(path, &*buffer.begin(), buffer.size(), mustExist);
-}
-
-void
-ZKFacade::setData(const Path& path, const char* buffer, size_t length, bool mustExist) {
- assert (length < _maxDataSize);
-
- if (mustExist || hasNode(path)) {
- setDataForExistingFile(*this, path, buffer, length, _zhandle);
- } else {
- setDataForNewFile(*this, path, buffer, length, _zhandle, 0);
- }
-}
-
-const Path
-ZKFacade::createSequenceNode(const Path& path, const char* buffer, size_t length) {
- assert (length < _maxDataSize);
-
- int createFlags = ZOO_SEQUENCE;
- return setDataForNewFile(*this, path, buffer, length, _zhandle, createFlags);
-}
-
-bool
-ZKFacade::hasNode(const Path& path) {
- RetryController retryController(this);
- do {
- Stat stat;
- const int noWatch = 0;
- retryController( zoo_exists(_zhandle, path.string().c_str(), noWatch, &stat));
- } while(retryController.shouldRetry());
-
- switch(retryController._lastStatus) {
- case ZNONODE:
- return false;
- case ZOK:
- return true;
- default:
- retryController.throwIfError(path);
- //this should never happen:
- assert(false);
- return false;
- }
-}
-
-bool
-ZKFacade::hasNode(const Path& path, const NodeChangedWatcherSP& watcher) {
- RegistrationGuard unregisterGuard(*this, watcher);
- void* watcherContext = unregisterGuard.get();
- RetryController retryController(this);
- do {
- Stat stat;
- retryController(zoo_wexists(_zhandle, path.string().c_str(), &ZKWatcher::watcherFn, watcherContext, &stat));
- } while (retryController.shouldRetry());
-
- bool retval(false);
- switch(retryController._lastStatus) {
- case ZNONODE:
- retval = false;
- break;
- case ZOK:
- retval = true;
- break;
- default:
- retryController.throwIfError(path);
- //this should never happen:
- assert(false);
- retval = false;
- break;
- }
- unregisterGuard.release();
- return retval;
-}
-
-void
-ZKFacade::addEphemeralNode(const Path& path) {
- try {
- setDataForNewFile(*this, path, "", 0, _zhandle, ZOO_EPHEMERAL);
- } catch(const ZKNodeExistsException& e) {
- remove(path);
- addEphemeralNode(path);
- }
-}
-
-void
-ZKFacade::remove(const Path& path) {
- std::vector< std::string > children = getChildren(path);
- if (!children.empty()) {
- std::for_each(children.begin(), children.end(), [&](const std::string & s){ remove(path / s); });
- }
-
- RetryController retryController(this);
- do {
- int ignoreVersion = -1;
- retryController( zoo_delete(_zhandle, path.string().c_str(), ignoreVersion));
- } while (retryController.shouldRetry());
-
- if (retryController._lastStatus != ZNONODE) {
- retryController.throwIfError(path);
- }
-}
-
-void
-ZKFacade::removeIfExists(const Path& path) {
- try {
- if (hasNode(path)) {
- remove(path);
- }
- } catch (const ZKNodeDoesNotExistsException& e) {
- //someone else removed it concurrently, not a problem.
- }
-}
-
-void
-ZKFacade::retainOnly(const Path& path, const std::vector<std::string>& childrenToPreserve) {
- typedef std::vector<std::string> Children;
-
- Children current = getChildren(path);
- std::sort(current.begin(), current.end());
-
- Children toPreserveSorted(childrenToPreserve);
- std::sort(toPreserveSorted.begin(), toPreserveSorted.end());
-
- std::set_difference(current.begin(), current.end(),
- toPreserveSorted.begin(), toPreserveSorted.end(),
- boost::make_function_output_iterator([&](const std::string & s){ remove(path / s); }));
-}
-
-std::vector< std::string >
-ZKFacade::getChildren(const Path& path) {
- RetryController retryController(this);
- String_vector children;
- do {
- const bool watch = false;
- retryController( zoo_get_children(_zhandle, path.string().c_str(), watch, &children));
- } while (retryController.shouldRetry());
-
- retryController.throwIfError(path);
-
- DeallocateZKStringVectorGuard deallocateGuard(children);
-
- typedef std::vector<std::string> ResultType;
- ResultType result;
- result.reserve(children.count);
-
- std::copy(children.data, children.data + children.count, std::back_inserter(result));
-
- return result;
-}
-
-std::vector< std::string >
-ZKFacade::getChildren(const Path& path, const NodeChangedWatcherSP& watcher) {
- RegistrationGuard unregisterGuard(*this, watcher);
- void* watcherContext = unregisterGuard.get();
-
- RetryController retryController(this);
- String_vector children;
- do {
- retryController( zoo_wget_children(_zhandle, path.string().c_str(), &ZKWatcher::watcherFn, watcherContext, &children));
- } while (retryController.shouldRetry());
-
- retryController.throwIfError(path);
-
- DeallocateZKStringVectorGuard deallocateGuard(children);
-
- typedef std::vector<std::string> ResultType;
- ResultType result;
- result.reserve(children.count);
-
- std::copy(children.data, children.data + children.count, std::back_inserter(result));
-
- unregisterGuard.release();
- return result;
-}
-
-
-void
-ZKFacade::disableRetries() {
- _retriesEnabled = false;
-}
-
-ZKLogging::ZKLogging() :
- _file(nullptr)
-{
- std::string filename = vespa::Defaults::underVespaHome("tmp/zookeeper.log");
- _file = std::fopen(filename.c_str(), "w");
- if (_file == nullptr) {
- LOGFWD(error, "Could not open file '%s'", filename.c_str());
- } else {
- zoo_set_log_stream(_file);
- }
-
- zoo_set_debug_level(ZOO_LOG_LEVEL_ERROR);
-}
-
-ZKLogging::~ZKLogging()
-{
- zoo_set_log_stream(nullptr);
- if (_file != nullptr) {
- std::fclose(_file);
- _file = nullptr;
- }
-}
diff --git a/filedistribution/src/vespa/filedistribution/model/zkfacade.h b/filedistribution/src/vespa/filedistribution/model/zkfacade.h
deleted file mode 100644
index f2451d8191b..00000000000
--- a/filedistribution/src/vespa/filedistribution/model/zkfacade.h
+++ /dev/null
@@ -1,147 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#pragma once
-
-#include <string>
-#include <vector>
-#include <map>
-#include <mutex>
-
-#include <vespa/filedistribution/common/buffer.h>
-#include <vespa/filedistribution/common/exception.h>
-#include <vespa/vespalib/util/exception.h>
-
-struct _zhandle;
-typedef _zhandle zhandle_t;
-
-namespace filedistribution {
-
-class ZKException : public vespalib::Exception {
-protected:
- using vespalib::Exception::Exception;
-};
-
-VESPA_DEFINE_EXCEPTION(ZKNodeDoesNotExistsException, ZKException);
-VESPA_DEFINE_EXCEPTION(ZKConnectionLossException, ZKException);
-VESPA_DEFINE_EXCEPTION(ZKNodeExistsException, ZKException);
-VESPA_DEFINE_EXCEPTION(ZKFailedConnecting, ZKException);
-VESPA_DEFINE_EXCEPTION(ZKOperationTimeoutException, ZKException);
-VESPA_DEFINE_EXCEPTION(ZKSessionExpired, ZKException);
-
-class ZKGenericException : public ZKException {
-public:
- ZKGenericException(int zkStatus, const vespalib::stringref &msg, const vespalib::stringref &location = "", int skipStack = 0) :
- ZKException(msg, location, skipStack),
- _zkStatus(zkStatus)
- { }
- ZKGenericException(int zkStatus, const vespalib::Exception &cause, const vespalib::stringref &msg = "",
- const vespalib::stringref &location = "", int skipStack = 0) :
- ZKException(msg, cause, location, skipStack),
- _zkStatus(zkStatus)
- { }
- VESPA_DEFINE_EXCEPTION_SPINE(ZKGenericException);
-private:
- const int _zkStatus;
-};
-
-class ZKFacade : public std::enable_shared_from_this<ZKFacade> {
- volatile bool _retriesEnabled;
- volatile bool _watchersEnabled;
-
- zhandle_t* _zhandle;
- const static int _zkSessionTimeOut = 30 * 1000;
- const static size_t _maxDataSize = 1024 * 1024;
-
- class ZKWatcher;
- static void stateWatchingFun(zhandle_t*, int type, int state, const char* path, void* context);
-public:
- typedef std::shared_ptr<ZKFacade> SP;
-
- /* Lifetime is managed by ZKFacade.
- Derived classes should only contain weak_ptrs to other objects
- to avoid linking their lifetime to the ZKFacade lifetime.
- */
- class NodeChangedWatcher {
- public:
- NodeChangedWatcher(const NodeChangedWatcher &) = delete;
- NodeChangedWatcher & operator = (const NodeChangedWatcher &) = delete;
- NodeChangedWatcher() = default;
- virtual ~NodeChangedWatcher() {};
- virtual void operator()() = 0;
- };
-
- typedef std::shared_ptr<NodeChangedWatcher> NodeChangedWatcherSP;
-
- ZKFacade(const ZKFacade &) = delete;
- ZKFacade & operator = (const ZKFacade &) = delete;
- ZKFacade(const std::string& zkservers, bool allowDNSFailure);
- ~ZKFacade();
-
- bool hasNode(const Path&);
- bool hasNode(const Path&, const NodeChangedWatcherSP&);
-
- const std::string getString(const Path&);
- Buffer getData(const Path&); //throws ZKNodeDoesNotExistsException
- //if watcher is specified, it will be set even if the node does not exists
- Buffer getData(const Path&, const NodeChangedWatcherSP&); //throws ZKNodeDoesNotExistsException
-
- //Parent path must exist
- void setData(const Path&, const Buffer& buffer, bool mustExist = false);
- void setData(const Path&, const char* buffer, size_t length, bool mustExist = false);
-
- const Path createSequenceNode(const Path&, const char* buffer, size_t length);
-
- void remove(const Path&); //throws ZKNodeDoesNotExistsException
- void removeIfExists(const Path&);
-
- void retainOnly(const Path&, const std::vector<std::string>& children);
-
- void addEphemeralNode(const Path& path);
- std::vector<std::string> getChildren(const Path& path);
- std::vector<std::string> getChildren(const Path& path, const NodeChangedWatcherSP&); //throws ZKNodeDoesNotExistsException
-
- //only for use by shutdown code.
- void disableRetries();
- bool retriesEnabled() {
- return _retriesEnabled;
- }
-
- static std::string getValidZKServers(const std::string &input, bool ignoreDNSFailure);
-
-private:
- class RegistrationGuard {
- public:
- RegistrationGuard & operator = (const RegistrationGuard &) = delete;
- RegistrationGuard(const RegistrationGuard &) = delete;
- RegistrationGuard(ZKFacade & zk, const NodeChangedWatcherSP & watcher) : _zk(zk), _watcherContext(_zk.registerWatcher(watcher)) { }
- ~RegistrationGuard() {
- if (_watcherContext) {
- _zk.unregisterWatcher(_watcherContext);
- }
- }
- void * get() { return _watcherContext; }
- void release() { _watcherContext = nullptr; }
- private:
- ZKFacade & _zk;
- void * _watcherContext;
- };
- void* registerWatcher(const NodeChangedWatcherSP &); //returns watcherContext
- std::shared_ptr<ZKWatcher> unregisterWatcher(void* watcherContext);
- void invokeWatcher(void* watcherContext);
-
- std::mutex _watchersMutex;
- typedef std::map<void*, std::shared_ptr<ZKWatcher> > WatchersMap;
- WatchersMap _watchers;
-};
-
-class ZKLogging {
-public:
- ZKLogging();
- ~ZKLogging();
- ZKLogging(const ZKLogging &) = delete;
- ZKLogging & operator = (const ZKLogging &) = delete;
-private:
- FILE * _file;
-};
-
-} //namespace filedistribution
-
diff --git a/filedistribution/src/vespa/filedistribution/model/zkfiledbmodel.cpp b/filedistribution/src/vespa/filedistribution/model/zkfiledbmodel.cpp
deleted file mode 100644
index 9931b104010..00000000000
--- a/filedistribution/src/vespa/filedistribution/model/zkfiledbmodel.cpp
+++ /dev/null
@@ -1,301 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "filedistributionmodel.h"
-#include "zkfacade.h"
-#include "zkfiledbmodel.h"
-#include "deployedfilestodownload.h"
-#include <vespa/filedistribution/common/logfwd.h>
-#include <sys/file.h>
-
-namespace fs = boost::filesystem;
-
-namespace filedistribution {
-
-namespace {
-
-fs::path
-createPath(const std::string& fileReference) {
- return ZKFileDBModel::_fileDBPath / fileReference;
-}
-
-void
-createNode(const fs::path & path, ZKFacade& zk) {
- if (!zk.hasNode(path))
- zk.setData(path, "", 0);
-}
-
-bool
-isEntryForHost(const std::string& host, const std::string& peerEntry) {
- return host.size() < peerEntry.size() &&
- std::equal(host.begin(), host.end(), peerEntry.begin()) &&
- peerEntry[host.size()] == ZKFileDBModel::_peerEntrySeparator;
-}
-
-std::vector<std::string>
-getSortedChildren(ZKFacade& zk, const Path& path) {
- std::vector<std::string> children = zk.getChildren(path);
- std::sort(children.begin(), children.end());
- return children;
-}
-
-} //anonymous namespace
-
-VESPA_IMPLEMENT_EXCEPTION(InvalidProgressException, vespalib::Exception);
-VESPA_IMPLEMENT_EXCEPTION(InvalidHostStatusException, vespalib::Exception);
-
-const Path ZKFileDBModel::_root = "/vespa/filedistribution";
-const Path ZKFileDBModel::_fileDBPath = _root / "files";
-const Path ZKFileDBModel::_hostsPath = _root / "hosts";
-
-bool
-ZKFileDBModel::hasFile(const std::string& fileReference) {
- return _zk->hasNode(createPath(fileReference));
-}
-
-void
-ZKFileDBModel::addFile(const std::string& fileReference, const Buffer& buffer) {
- return _zk->setData(createPath(fileReference), buffer);
-}
-
-Buffer
-ZKFileDBModel::getFile(const std::string& fileReference) {
- try {
- return _zk->getData(createPath(fileReference));
- } catch(const ZKNodeDoesNotExistsException & e) {
- throw FileDoesNotExistException(fileReference, e, VESPA_STRLOC);
- }
-}
-
-void
-ZKFileDBModel::setDeployedFilesToDownload(
- const std::string& hostName,
- const std::string& appId,
- const std::vector<std::string>& files) {
- DeployedFilesToDownload d(_zk.get());
- d.setDeployedFilesToDownload(hostName, appId, files);
-}
-
-void
-ZKFileDBModel::cleanDeployedFilesToDownload(
- const std::vector<std::string>& hostsToPreserve,
- const std::string& appId) {
-
- std::vector<std::string> allHosts = getHosts();
- std::set<std::string> toPreserve(hostsToPreserve.begin(), hostsToPreserve.end());
-
- for (auto & host : allHosts) {
- Path hostPath = _hostsPath / host;
- try {
- // If this host is NOT part of hosts to deploy to
- if (toPreserve.find(host) == toPreserve.end()) {
- removeDeployFileNodes(hostPath, appId);
- if (canRemoveHost(hostPath, appId)) {
- _zk->remove(hostPath);
- }
- }
- } catch (const ZKNodeDoesNotExistsException& e) {
- LOGFWD(debug, "Host '%s' changed. Not touching", hostPath.string().c_str());
- }
- }
-}
-
-void
-ZKFileDBModel::removeDeploymentsThatHaveDifferentApplicationId(
- const std::vector<std::string>& hostsToPreserve,
- const std::string& appId) {
-
- std::vector<std::string> allHosts = getHosts();
- std::set<std::string> toPreserve(hostsToPreserve.begin(), hostsToPreserve.end());
-
- for (auto & host : allHosts) {
- Path hostPath = _hostsPath / host;
- try {
- if (toPreserve.find(host) != toPreserve.end()) {
- removeNonApplicationFiles(hostPath, appId);
- }
- } catch (const ZKNodeDoesNotExistsException& e) {
- LOGFWD(debug, "Host '%s' changed. Not touching", hostPath.string().c_str());
- }
- }
-}
-
-
-// Delete files which do not belong to this application.
-void
-ZKFileDBModel::removeNonApplicationFiles(const Path & hostPath, const std::string& appId)
-{
- std::vector<std::string> deployNodes = _zk->getChildren(hostPath);
- for (auto & deployNode : deployNodes) {
- Path deployNodePath = hostPath / deployNode;
- std::string applicationId(readApplicationId(*_zk, deployNodePath));
- if (appId != applicationId) {
- _zk->remove(deployNodePath);
- }
- }
-}
-
-void
-ZKFileDBModel::removeDeployFileNodes(const Path & hostPath, const std::string& appId) {
- std::vector<std::string> deployNodes = _zk->getChildren(hostPath);
- for (auto & deployNode : deployNodes) {
- Path deployNodePath = hostPath / deployNode;
- std::string applicationId(readApplicationId(*_zk, deployNodePath));
- if (appId == applicationId) {
- _zk->remove(deployNodePath);
- }
- }
-}
-
-bool
-ZKFileDBModel::canRemoveHost(const Path & hostPath, const std::string& appId) {
- std::vector<std::string> deployNodes = _zk->getChildren(hostPath);
- for (auto & deployNode : deployNodes) {
- Path deployNodePath = hostPath / deployNode;
- std::string applicationId(readApplicationId(*_zk, deployNodePath));
- if (appId != applicationId) {
- return false;
- }
- }
- return true;
-}
-
-std::vector<std::string>
-ZKFileDBModel::getHosts() {
- try {
- return _zk->getChildren(_hostsPath);
- } catch(ZKNodeDoesNotExistsException&) {
- LOGFWD(debug, "No files to be distributed.");
- return std::vector<std::string>();
- }
-}
-
-namespace {
-const ZKFileDBModel::Progress::value_type notStarted = 101;
-};
-
-//TODO: Refactor
-ZKFileDBModel::HostStatus
-ZKFileDBModel::getHostStatus(const std::string& hostName) {
- typedef std::vector<std::string> PeerEntries;
-
- DeployedFilesToDownload d(_zk.get());
- DeployedFilesToDownload::FileReferences filesToDownload = d.getLatestDeployedFilesToDownload(hostName);
-
- HostStatus hostStatus;
- hostStatus._state = HostStatus::notStarted;
- hostStatus._numFilesToDownload = filesToDownload.size();
- hostStatus._numFilesFinished = 0;
-
- for (const std::string & file : filesToDownload) {
- Path path = getPeersPath(file);
-
- const PeerEntries peerEntries = getSortedChildren(*_zk, path);
- PeerEntries::const_iterator candidate =
- std::lower_bound(peerEntries.begin(), peerEntries.end(), hostName);
-
- if (candidate != peerEntries.end() && isEntryForHost(hostName, *candidate)) {
- char fileProgressPercentage = getProgress(path / (*candidate));
- if (fileProgressPercentage == 100) {
- hostStatus._numFilesFinished++;
- } else if (fileProgressPercentage != notStarted) {
- hostStatus._state = HostStatus::inProgress;
- }
-
- candidate++;
- if (candidate != peerEntries.end() && isEntryForHost(hostName, *candidate))
- throw InvalidHostStatusException(path.string(), VESPA_STRLOC);
- }
- }
-
-
- if (hostStatus._numFilesToDownload == hostStatus._numFilesFinished) {
- hostStatus._state = HostStatus::finished;
- }
-
- return hostStatus;
-}
-
-void
-ZKFileDBModel::cleanFiles(const std::vector<std::string>& filesToPreserve) {
- _zk->retainOnly(_fileDBPath, filesToPreserve);
-}
-
-ZKFileDBModel::ZKFileDBModel(const std::shared_ptr<ZKFacade>& zk)
- : _zk(zk)
-{
- createNode(_root, *_zk);
- createNode(_fileDBPath, *_zk);
- createNode(_hostsPath, *_zk);
-}
-
-ZKFileDBModel::~ZKFileDBModel() {}
-
-char
-ZKFileDBModel::getProgress(const Path& path) {
- try {
- Buffer buffer(_zk->getData(path));
- if (buffer.size() == 1)
- return buffer[0];
- else if (buffer.size() == 0)
- return 0;
- else {
- throw InvalidProgressException(path.string(), VESPA_STRLOC);
- }
- } catch (ZKNodeDoesNotExistsException& e) {
- //progress information deleted
- return notStarted;
- }
-}
-
-ZKFileDBModel::Progress
-ZKFileDBModel::getProgress(const std::string& fileReference,
- const std::vector<std::string>& hostsSortedAscending) {
- Path path = getPeersPath(fileReference);
-
- Progress progress;
- progress.reserve(hostsSortedAscending.size());
-
- typedef std::vector<std::string> PeerEntries;
- const PeerEntries peerEntries = getSortedChildren(*_zk, path);
-
- PeerEntries::const_iterator current = peerEntries.begin();
- for (const std::string& host : hostsSortedAscending) {
- PeerEntries::const_iterator candidate =
- std::lower_bound(current, peerEntries.end(), host);
-
- ZKFileDBModel::Progress::value_type hostProgress = notStarted;
- if (candidate != peerEntries.end()) {
- current = candidate;
- if (isEntryForHost(host, *current))
- hostProgress = getProgress(path / (*candidate));
- }
- progress.push_back(hostProgress);
- }
- return progress;
-}
-
-FileDBModel::~FileDBModel() {}
-
-DirectoryGuard::DirectoryGuard(Path path) :
- _fd(-1)
-{
- _fd = open(path.c_str(), O_RDONLY);
- assert(_fd != -1);
- int retval = flock(_fd, LOCK_EX);
- while ((retval != 0) && (errno == EINTR)) {
- std::cout << "Got interupted while flock'ing ' " << path << std::endl;
- retval = flock(_fd, LOCK_EX);
- }
- assert(retval == 0);
-}
-
-DirectoryGuard::~DirectoryGuard() {
- if (_fd != -1) {
- int retval = flock(_fd, LOCK_UN);
- assert(retval == 0);
- retval = close(_fd);
- assert(retval ==0);
- }
-}
-
-}
diff --git a/filedistribution/src/vespa/filedistribution/model/zkfiledbmodel.h b/filedistribution/src/vespa/filedistribution/model/zkfiledbmodel.h
deleted file mode 100644
index 4a89a9547a9..00000000000
--- a/filedistribution/src/vespa/filedistribution/model/zkfiledbmodel.h
+++ /dev/null
@@ -1,55 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#pragma once
-
-#include "filedistributionmodel.h"
-#include "zkfacade.h"
-
-namespace filedistribution {
-
-class ZKFileDBModel : public FileDBModel {
-private:
- const std::shared_ptr<ZKFacade> _zk;
- char getProgress(const Path& path);
- void removeDeployFileNodes(const Path& hostPath, const std::string& appId);
- bool canRemoveHost(const Path& hostPath, const std::string& appId);
-public:
- const static Path _root;
- const static Path _fileDBPath;
- const static Path _hostsPath;
-
- const static char _peerEntrySeparator = ':';
-
- Path getPeersPath(const std::string& fileReference) {
- return _fileDBPath / fileReference;
- }
-
- //overrides
- bool hasFile(const std::string& fileReference) override;
- void addFile(const std::string& fileReference, const Buffer& buffer) override;
- Buffer getFile(const std::string& fileReference) override;
- void cleanFiles(const std::vector<std::string>& filesToPreserve) override;
-
- void setDeployedFilesToDownload(const std::string& hostName,
- const std::string & appId,
- const std::vector<std::string> & files) override;
- void cleanDeployedFilesToDownload(
- const std::vector<std::string> & hostsToPreserve,
- const std::string& appId) override;
- void removeDeploymentsThatHaveDifferentApplicationId(
- const std::vector<std::string> & hostsToPreserve,
- const std::string& appId) override;
- void removeNonApplicationFiles(
- const Path & hostPath,
- const std::string& appId);
- std::vector<std::string> getHosts() override;
- HostStatus getHostStatus(const std::string& hostName) override;
-
- ZKFileDBModel(const std::shared_ptr<ZKFacade>& zk);
- ~ZKFileDBModel();
-
- Progress getProgress(const std::string& fileReference,
- const std::vector<std::string>& hostsSortedAscending) override;
-};
-
-} //namespace filedistribution
-
diff --git a/filedistribution/src/vespa/filedistribution/rpc/CMakeLists.txt b/filedistribution/src/vespa/filedistribution/rpc/CMakeLists.txt
deleted file mode 100644
index bc03f71eae7..00000000000
--- a/filedistribution/src/vespa/filedistribution/rpc/CMakeLists.txt
+++ /dev/null
@@ -1,6 +0,0 @@
-# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-vespa_add_library(filedistribution_filedistributorrpc STATIC
- SOURCES
- filedistributorrpc.cpp
- DEPENDS
-)
diff --git a/filedistribution/src/vespa/filedistribution/rpc/filedistributorrpc.cpp b/filedistribution/src/vespa/filedistribution/rpc/filedistributorrpc.cpp
deleted file mode 100644
index ff1248af157..00000000000
--- a/filedistribution/src/vespa/filedistribution/rpc/filedistributorrpc.cpp
+++ /dev/null
@@ -1,267 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "filedistributorrpc.h"
-#include <mutex>
-
-#include <vespa/fnet/frt/frt.h>
-#include <vespa/frtstream/frtserverstream.h>
-#include <map>
-
-#include "fileprovider.h"
-#include <vespa/filedistribution/model/filedbmodel.h>
-#include <vespa/log/log.h>
-LOG_SETUP(".filedistributorrpc");
-
-using filedistribution::FileDistributorRPC;
-using filedistribution::FileProvider;
-
-namespace fs = boost::filesystem;
-
-namespace {
-typedef std::lock_guard<std::mutex> LockGuard;
-
-struct RPCErrorCodes {
- const static uint32_t baseErrorCode = 0x10000;
- const static uint32_t baseFileProviderErrorCode = baseErrorCode + 0x1000;
-
- const static uint32_t unknownError = baseErrorCode + 1;
-};
-
-class QueuedRequests {
- bool _shuttingDown;
-
- std::mutex _mutex;
- typedef std::multimap<std::string, FRT_RPCRequest*> Map;
- Map _queuedRequests;
-
- template <class FUNC>
- void returnAnswer(const std::string& fileReference, FUNC func) {
- LockGuard guard(_mutex);
-
- typedef Map::iterator iterator;
- std::pair<iterator, iterator> range = _queuedRequests.equal_range(fileReference);
-
- for (iterator it(range.first); it != range.second; it++) {
- const Map::value_type & request(*it);
- LOG(info, "Returning earlier enqueued request for file reference '%s'.", request.first.c_str());
- func(*request.second);
- request.second->Return();
- }
-
- _queuedRequests.erase(range.first, range.second);
- }
-
- struct DownloadFinished {
- const std::string& _path;
-
- void operator()(FRT_RPCRequest& request) {
- LOG(info, "Download finished: '%s'", _path.c_str());
- frtstream::FrtServerStream requestHandler(&request);
- requestHandler <<_path;
- }
-
- DownloadFinished(const std::string& path)
- :_path(path)
- {}
- };
-
- struct DownloadFailed {
- FileProvider::FailedDownloadReason _reason;
-
- void operator()(FRT_RPCRequest& request) {
- LOG(info, "Download failed: '%d'", _reason);
- request.SetError(RPCErrorCodes::baseFileProviderErrorCode + _reason, "Download failed");
- }
-
- DownloadFailed(FileProvider::FailedDownloadReason reason)
- :_reason(reason)
- {}
- };
-
-public:
- QueuedRequests()
- :_shuttingDown(false)
- {}
-
- void enqueue(const std::string& fileReference, FRT_RPCRequest* request) {
- LockGuard guard(_mutex);
-
- if (_shuttingDown) {
- LOG(info, "Shutdown: Aborting request for file reference '%s'.", fileReference.c_str());
- abort(request);
- } else {
- _queuedRequests.insert(std::make_pair(fileReference, request));
- }
- }
-
- void abort(FRT_RPCRequest* request) {
- request->SetError(FRTE_RPC_ABORT);
- request->Return();
- }
-
- void dequeue(const std::string& fileReference, FRT_RPCRequest* request) {
- LockGuard guard(_mutex);
-
- typedef Map::iterator iterator;
- std::pair<iterator, iterator> range = _queuedRequests.equal_range(fileReference);
-
- iterator candidate = std::find(range.first, range.second,
- std::pair<const std::string, FRT_RPCRequest*>(fileReference, request));
-
- if (candidate != range.second)
- _queuedRequests.erase(candidate);
- }
-
- void downloadFinished(const std::string& fileReference, const fs::path& path) {
-
- DownloadFinished handler(path.string());
- returnAnswer(fileReference, handler);
- }
-
- void downloadFailed(const std::string& fileReference, FileProvider::FailedDownloadReason reason) {
-
- DownloadFailed handler(reason);
- returnAnswer(fileReference, handler);
- }
-
- void shutdown() {
- LockGuard guard(_mutex);
- _shuttingDown = true;
-
- for (const Map::value_type& request : _queuedRequests) {
- LOG(info, "Shutdown: Aborting earlier enqueued request for file reference '%s'.", request.first.c_str());
- abort(request.second);
- }
- _queuedRequests.erase(_queuedRequests.begin(), _queuedRequests.end());
- }
-};
-
-} //anonymous namespace
-
-class FileDistributorRPC::Server : public FRT_Invokable {
- public:
- FileProvider::SP _fileProvider;
- std::unique_ptr<FRT_Supervisor> _supervisor;
-
- QueuedRequests _queuedRequests;
-
- boost::signals2::scoped_connection _downloadCompletedConnection;
- boost::signals2::scoped_connection _downloadFailedConnection;
-
- void queueRequest(const std::string& fileReference, FRT_RPCRequest* request);
- void defineMethods();
-
- Server(const Server &) = delete;
- Server & operator = (const Server &) = delete;
- Server(int listen_port, const FileProvider::SP & provider);
- void start(const FileDistributorRPC::SP & parent);
- ~Server();
-
- void waitFor(FRT_RPCRequest*);
-};
-
-FileDistributorRPC::
-Server::Server(int listen_port, const FileProvider::SP & provider)
- :_fileProvider(provider),
- _supervisor(new FRT_Supervisor())
-{
- defineMethods();
- _supervisor->Listen(listen_port);
- _supervisor->Start();
-}
-
-
-FileDistributorRPC::Server::~Server() {
- _queuedRequests.shutdown();
-
- const bool waitForFinished = true;
- _supervisor->ShutDown(waitForFinished);
-}
-
-void
-FileDistributorRPC::Server::start(const FileDistributorRPC::SP & parent) {
- _downloadCompletedConnection =
- _fileProvider->downloadCompleted().connect(FileProvider::DownloadCompletedSignal::slot_type(
- [&] (const std::string &file, const fs::path& path) { _queuedRequests.downloadFinished(file, path); })
- .track_foreign(parent));
-
- _downloadFailedConnection =
- _fileProvider->downloadFailed().connect(FileProvider::DownloadFailedSignal::slot_type(
- [&] (const std::string& file, FileProvider::FailedDownloadReason reason) { _queuedRequests.downloadFailed(file, reason); })
- .track_foreign(parent));
-
-
-}
-
-void
-FileDistributorRPC::
-Server::queueRequest(const std::string& fileReference, FRT_RPCRequest* request) {
- _queuedRequests.enqueue( fileReference, request );
- try {
- _fileProvider->downloadFile(fileReference);
- } catch(...) {
- _queuedRequests.dequeue(fileReference, request);
- throw;
- }
-}
-
-void
-FileDistributorRPC::Server::defineMethods() {
- const bool instant = true;
- FRT_ReflectionBuilder builder(_supervisor.get());
- builder.DefineMethod("waitFor", "s", "s", instant,
- FRT_METHOD(Server::waitFor), this);
-}
-
-void
-FileDistributorRPC::Server::waitFor(FRT_RPCRequest* request) {
- try {
- frtstream::FrtServerStream requestHandler(request);
- std::string fileReference;
- requestHandler >> fileReference;
- boost::optional<fs::path> path = _fileProvider->getPath(fileReference);
- if (path) {
- LOG(debug, "Returning request for file reference '%s'.", fileReference.c_str());
- requestHandler << path->string();
- } else {
- LOG(debug, "Enqueuing file request for file reference '%s'.", fileReference.c_str());
- request->Detach();
- queueRequest(fileReference, request);
- }
- } catch (const FileDoesNotExistException&) {
- LOG(warning, "Received a request for a file reference that does not exist in zookeeper.");
- request->SetError(RPCErrorCodes::baseFileProviderErrorCode + FileProvider::FileReferenceDoesNotExist,
- "No such file reference");
- request->Return();
- } catch (const std::exception& e) {
- LOG(error, "An exception occurred while calling the rpc method waitFor:%s", e.what());
- request->SetError(RPCErrorCodes::unknownError, e.what());
- request->Return(); //the request might be detached.
- }
-}
-
-FileDistributorRPC::FileDistributorRPC(const std::string& connectionSpec,
- const FileProvider::SP & provider)
- :_server(new Server(get_port(connectionSpec), provider))
-{}
-
-void
-FileDistributorRPC::start()
-{
- _server->start(shared_from_this());
-}
-
-int
-FileDistributorRPC::get_port(const std::string &spec)
-{
- const char *port = (spec.data() + spec.size());
- while ((port > spec.data()) && (port[-1] >= '0') && (port[-1] <= '9')) {
- --port;
- }
- return atoi(port);
-}
-
-filedistribution::FileDistributorRPC::~FileDistributorRPC()
-{
- LOG(debug, "Deconstructing FileDistributorRPC");
-}
diff --git a/filedistribution/src/vespa/filedistribution/rpc/filedistributorrpc.h b/filedistribution/src/vespa/filedistribution/rpc/filedistributorrpc.h
deleted file mode 100644
index cf9ed110d36..00000000000
--- a/filedistribution/src/vespa/filedistribution/rpc/filedistributorrpc.h
+++ /dev/null
@@ -1,29 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#pragma once
-
-#include <memory>
-
-#include "fileprovider.h"
-
-namespace filedistribution {
-
-class FileDistributorRPC : public std::enable_shared_from_this<FileDistributorRPC>
-{
- class Server;
-public:
- using SP = std::shared_ptr<FileDistributorRPC>;
- FileDistributorRPC(const FileDistributorRPC &) = delete;
- FileDistributorRPC & operator = (const FileDistributorRPC &) = delete;
- FileDistributorRPC(const std::string& connectSpec, const FileProvider::SP & provider);
-
- void start();
-
- static int get_port(const std::string &spec);
-
- ~FileDistributorRPC();
-private:
- std::unique_ptr<Server> _server;
-};
-
-} //namespace filedistribution
-
diff --git a/filedistribution/src/vespa/filedistribution/rpc/fileprovider.h b/filedistribution/src/vespa/filedistribution/rpc/fileprovider.h
deleted file mode 100644
index a92d09408b1..00000000000
--- a/filedistribution/src/vespa/filedistribution/rpc/fileprovider.h
+++ /dev/null
@@ -1,36 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#pragma once
-
-#include <vespa/filedistribution/common/exception.h>
-#include<boost/optional.hpp>
-#include<boost/signals2.hpp>
-
-namespace filedistribution {
-
-class FileProvider
-{
-public:
- using SP = std::shared_ptr<FileProvider>;
- typedef boost::signals2::signal<void (const std::string& /* fileReference */, const Path&)> DownloadCompletedSignal;
- typedef DownloadCompletedSignal::slot_type DownloadCompletedHandler;
-
- enum FailedDownloadReason {
- FileReferenceDoesNotExist,
- FileReferenceRemoved
- };
-
- typedef boost::signals2::signal<void (const std::string& /* fileReference */, FailedDownloadReason)> DownloadFailedSignal;
- typedef DownloadFailedSignal::slot_type DownloadFailedHandler;
-
- virtual boost::optional<Path> getPath(const std::string& fileReference) = 0;
- virtual void downloadFile(const std::string& fileReference) = 0;
-
- virtual ~FileProvider() {}
-
- //Signals
- virtual DownloadCompletedSignal& downloadCompleted() = 0;
- virtual DownloadFailedSignal& downloadFailed() = 0;
-};
-
-} //namespace filedistribution
-