aboutsummaryrefslogtreecommitdiffstats
path: root/filedistribution
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@yahoo-inc.com>2016-06-15 23:09:44 +0200
committerJon Bratseth <bratseth@yahoo-inc.com>2016-06-15 23:09:44 +0200
commit72231250ed81e10d66bfe70701e64fa5fe50f712 (patch)
tree2728bba1131a6f6e5bdf95afec7d7ff9358dac50 /filedistribution
Publish
Diffstat (limited to 'filedistribution')
-rw-r--r--filedistribution/.gitignore7
-rw-r--r--filedistribution/CMakeLists.txt39
-rw-r--r--filedistribution/OWNERS1
-rw-r--r--filedistribution/pom.xml48
-rw-r--r--filedistribution/src/.gitignore3
-rw-r--r--filedistribution/src/apps/.gitignore1
-rw-r--r--filedistribution/src/apps/filedistributor/.gitignore2
-rw-r--r--filedistribution/src/apps/filedistributor/CMakeLists.txt19
-rw-r--r--filedistribution/src/apps/filedistributor/filedistributor.cpp401
-rw-r--r--filedistribution/src/apps/status/.gitignore2
-rw-r--r--filedistribution/src/apps/status/CMakeLists.txt16
-rw-r--r--filedistribution/src/apps/status/status-filedistribution.cpp185
-rw-r--r--filedistribution/src/apps/status/vespa-status-filedistribution.sh68
-rw-r--r--filedistribution/src/tests/.gitignore2
-rw-r--r--filedistribution/src/tests/common/.gitignore1
-rw-r--r--filedistribution/src/tests/common/CMakeLists.txt13
-rw-r--r--filedistribution/src/tests/common/testCommon.cpp42
-rw-r--r--filedistribution/src/tests/filedbmodelimpl/.gitignore1
-rw-r--r--filedistribution/src/tests/filedbmodelimpl/CMakeLists.txt16
-rw-r--r--filedistribution/src/tests/filedbmodelimpl/test-filedistributionmodelimpl.cpp54
-rw-r--r--filedistribution/src/tests/filedownloader/.gitignore1
-rw-r--r--filedistribution/src/tests/filedownloader/CMakeLists.txt15
-rw-r--r--filedistribution/src/tests/filedownloader/testfiledownloader.cpp158
-rw-r--r--filedistribution/src/tests/lib/CMakeLists.txt6
-rw-r--r--filedistribution/src/tests/lib/mock-zookeeper.cpp326
-rw-r--r--filedistribution/src/tests/rpc/.gitignore1
-rw-r--r--filedistribution/src/tests/rpc/CMakeLists.txt14
-rw-r--r--filedistribution/src/tests/rpc/mockfileprovider.h50
-rw-r--r--filedistribution/src/tests/rpc/testfileprovider.cpp66
-rw-r--r--filedistribution/src/tests/scheduler/.gitignore1
-rw-r--r--filedistribution/src/tests/scheduler/CMakeLists.txt16
-rw-r--r--filedistribution/src/tests/scheduler/test-scheduler.cpp110
-rw-r--r--filedistribution/src/tests/status/.gitignore1
-rw-r--r--filedistribution/src/tests/status/CMakeLists.txt15
-rw-r--r--filedistribution/src/tests/status/test-status.cpp19
-rw-r--r--filedistribution/src/tests/zkfacade/.gitignore1
-rw-r--r--filedistribution/src/tests/zkfacade/CMakeLists.txt15
-rw-r--r--filedistribution/src/tests/zkfacade/test-zkfacade.cpp226
-rw-r--r--filedistribution/src/tests/zkfiledbmodel/.gitignore1
-rw-r--r--filedistribution/src/tests/zkfiledbmodel/CMakeLists.txt16
-rw-r--r--filedistribution/src/tests/zkfiledbmodel/test-zkfiledbmodel.cpp99
-rw-r--r--filedistribution/src/vespa/.gitignore3
-rw-r--r--filedistribution/src/vespa/filedistribution/common/CMakeLists.txt19
-rw-r--r--filedistribution/src/vespa/filedistribution/common/buffer.h151
-rw-r--r--filedistribution/src/vespa/filedistribution/common/componentsdeleter.cpp89
-rw-r--r--filedistribution/src/vespa/filedistribution/common/componentsdeleter.h71
-rw-r--r--filedistribution/src/vespa/filedistribution/common/concurrentqueue.h53
-rw-r--r--filedistribution/src/vespa/filedistribution/common/exception.cpp24
-rw-r--r--filedistribution/src/vespa/filedistribution/common/exception.h65
-rw-r--r--filedistribution/src/vespa/filedistribution/common/exceptionrethrower.h47
-rw-r--r--filedistribution/src/vespa/filedistribution/common/logfwd.h20
-rw-r--r--filedistribution/src/vespa/filedistribution/common/vespa_logfwd.cpp47
-rw-r--r--filedistribution/src/vespa/filedistribution/distributor/CMakeLists.txt15
-rw-r--r--filedistribution/src/vespa/filedistribution/distributor/filedistributor.def10
-rw-r--r--filedistribution/src/vespa/filedistribution/distributor/filedistributortrackerimpl.cpp203
-rw-r--r--filedistribution/src/vespa/filedistribution/distributor/filedistributortrackerimpl.h45
-rw-r--r--filedistribution/src/vespa/filedistribution/distributor/filedownloader.cpp437
-rw-r--r--filedistribution/src/vespa/filedistribution/distributor/filedownloader.h94
-rw-r--r--filedistribution/src/vespa/filedistribution/distributor/filedownloadermanager.cpp148
-rw-r--r--filedistribution/src/vespa/filedistribution/distributor/filedownloadermanager.h68
-rw-r--r--filedistribution/src/vespa/filedistribution/distributor/hostname.cpp22
-rw-r--r--filedistribution/src/vespa/filedistribution/distributor/hostname.h22
-rw-r--r--filedistribution/src/vespa/filedistribution/distributor/scheduler.cpp49
-rw-r--r--filedistribution/src/vespa/filedistribution/distributor/scheduler.h45
-rw-r--r--filedistribution/src/vespa/filedistribution/distributor/signalhandling.cpp40
-rw-r--r--filedistribution/src/vespa/filedistribution/distributor/signalhandling.h15
-rw-r--r--filedistribution/src/vespa/filedistribution/distributor/state_server_impl.cpp8
-rw-r--r--filedistribution/src/vespa/filedistribution/distributor/state_server_impl.h21
-rw-r--r--filedistribution/src/vespa/filedistribution/manager/.gitignore5
-rw-r--r--filedistribution/src/vespa/filedistribution/manager/CMakeLists.txt27
-rw-r--r--filedistribution/src/vespa/filedistribution/manager/createtorrent.cpp90
-rw-r--r--filedistribution/src/vespa/filedistribution/manager/createtorrent.h23
-rw-r--r--filedistribution/src/vespa/filedistribution/manager/field.h46
-rw-r--r--filedistribution/src/vespa/filedistribution/manager/filedb.cpp57
-rw-r--r--filedistribution/src/vespa/filedistribution/manager/filedb.h18
-rw-r--r--filedistribution/src/vespa/filedistribution/manager/filedistributionmanager.cpp236
-rw-r--r--filedistribution/src/vespa/filedistribution/manager/jnistring.h78
-rw-r--r--filedistribution/src/vespa/filedistribution/manager/stderr_logfwd.cpp26
-rw-r--r--filedistribution/src/vespa/filedistribution/model/CMakeLists.txt20
-rw-r--r--filedistribution/src/vespa/filedistribution/model/deployedfilestodownload.cpp158
-rw-r--r--filedistribution/src/vespa/filedistribution/model/deployedfilestodownload.h56
-rw-r--r--filedistribution/src/vespa/filedistribution/model/filedbmodel.h56
-rw-r--r--filedistribution/src/vespa/filedistribution/model/filedistributionmodel.h43
-rw-r--r--filedistribution/src/vespa/filedistribution/model/filedistributionmodelimpl.cpp239
-rw-r--r--filedistribution/src/vespa/filedistribution/model/filedistributionmodelimpl.h76
-rw-r--r--filedistribution/src/vespa/filedistribution/model/filereferences.def3
-rw-r--r--filedistribution/src/vespa/filedistribution/model/mockfiledistributionmodel.h61
-rw-r--r--filedistribution/src/vespa/filedistribution/model/zkfacade.cpp648
-rw-r--r--filedistribution/src/vespa/filedistribution/model/zkfacade.h135
-rw-r--r--filedistribution/src/vespa/filedistribution/model/zkfiledbmodel.cpp298
-rw-r--r--filedistribution/src/vespa/filedistribution/model/zkfiledbmodel.h59
-rw-r--r--filedistribution/src/vespa/filedistribution/rpc/CMakeLists.txt6
-rw-r--r--filedistribution/src/vespa/filedistribution/rpc/filedistributorrpc.cpp279
-rw-r--r--filedistribution/src/vespa/filedistribution/rpc/filedistributorrpc.h30
-rw-r--r--filedistribution/src/vespa/filedistribution/rpc/fileprovider.h39
-rw-r--r--filedistribution/src/vespa/filedistribution/rpcconfig/CMakeLists.txt7
-rw-r--r--filedistribution/src/vespa/filedistribution/rpcconfig/filedistributorrpc.def3
97 files changed, 6731 insertions, 0 deletions
diff --git a/filedistribution/.gitignore b/filedistribution/.gitignore
new file mode 100644
index 00000000000..542ce6cebef
--- /dev/null
+++ b/filedistribution/.gitignore
@@ -0,0 +1,7 @@
+Makefile.ini
+config_command.sh
+project.dsw
+/pom.xml.build
+/target
+Makefile
+Testing
diff --git a/filedistribution/CMakeLists.txt b/filedistribution/CMakeLists.txt
new file mode 100644
index 00000000000..6205f44515b
--- /dev/null
+++ b/filedistribution/CMakeLists.txt
@@ -0,0 +1,39 @@
+# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_define_module(
+ DEPENDS
+ fastos
+ vespalog
+ fnet
+ frtstream
+ configdefinitions
+ config_cloudconfig
+ staging_vespalib
+ vespadefaults
+ vespalib
+
+ EXTERNAL_DEPENDS
+ torrent-rasterbar
+
+ LIBS
+ src/vespa/filedistribution/rpc
+ src/vespa/filedistribution/common
+ src/vespa/filedistribution/manager
+ src/vespa/filedistribution/rpcconfig
+ src/vespa/filedistribution/distributor
+ src/vespa/filedistribution/model
+
+ TESTS
+ src/tests/zkfacade
+ src/tests/zkfiledbmodel
+ src/tests/filedbmodelimpl
+ src/tests/status
+ src/tests/scheduler
+ src/tests/rpc
+ src/tests/common
+ src/tests/filedownloader
+ src/tests/lib
+
+ APPS
+ src/apps/status
+ src/apps/filedistributor
+)
diff --git a/filedistribution/OWNERS b/filedistribution/OWNERS
new file mode 100644
index 00000000000..31af040f698
--- /dev/null
+++ b/filedistribution/OWNERS
@@ -0,0 +1 @@
+bratseth
diff --git a/filedistribution/pom.xml b/filedistribution/pom.xml
new file mode 100644
index 00000000000..ea5fb5b450e
--- /dev/null
+++ b/filedistribution/pom.xml
@@ -0,0 +1,48 @@
+<!-- Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+ http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>parent</artifactId>
+ <version>6-SNAPSHOT</version>
+ <relativePath>../parent/pom.xml</relativePath>
+ </parent>
+ <artifactId>filedistribution</artifactId>
+ <version>6-SNAPSHOT</version>
+ <packaging>jar</packaging>
+ <name>${project.artifactId}</name>
+ <dependencies>
+ <dependency>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>config-lib</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>config-class-plugin</artifactId>
+ <version>${project.version}</version>
+ <configuration>
+ <defFilesDirectories>
+ src/vespa/filedistribution/distributor,
+ src/vespa/filedistribution/model,
+ src/vespa/filedistribution/rpcconfig
+ </defFilesDirectories>
+ </configuration>
+ <executions>
+ <execution>
+ <id>config-gen</id>
+ <goals>
+ <goal>config-gen</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/filedistribution/src/.gitignore b/filedistribution/src/.gitignore
new file mode 100644
index 00000000000..420f4e53d8a
--- /dev/null
+++ b/filedistribution/src/.gitignore
@@ -0,0 +1,3 @@
+.depend
+/Makefile.ini
+/config_command.sh
diff --git a/filedistribution/src/apps/.gitignore b/filedistribution/src/apps/.gitignore
new file mode 100644
index 00000000000..f3c7a7c5da6
--- /dev/null
+++ b/filedistribution/src/apps/.gitignore
@@ -0,0 +1 @@
+Makefile
diff --git a/filedistribution/src/apps/filedistributor/.gitignore b/filedistribution/src/apps/filedistributor/.gitignore
new file mode 100644
index 00000000000..a41b1963b70
--- /dev/null
+++ b/filedistribution/src/apps/filedistributor/.gitignore
@@ -0,0 +1,2 @@
+/filedistributor
+filedistributor-bin
diff --git a/filedistribution/src/apps/filedistributor/CMakeLists.txt b/filedistribution/src/apps/filedistributor/CMakeLists.txt
new file mode 100644
index 00000000000..f7be9d46c76
--- /dev/null
+++ b/filedistribution/src/apps/filedistributor/CMakeLists.txt
@@ -0,0 +1,19 @@
+# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_executable(filedistribution_filedistributor_app
+ SOURCES
+ filedistributor.cpp
+ OUTPUT_NAME filedistributor-bin
+ INSTALL sbin
+ DEPENDS
+ filedistribution_distributor
+ filedistribution_filedistributionmodel
+ filedistribution_filedistributorrpcconfig
+ filedistribution_filedistributorrpc
+ filedistribution_common
+)
+target_compile_options(filedistribution_filedistributor_app PRIVATE -DTORRENT_DEBUG -DTORRENT_DISABLE_ENCRYPTION -DTORRENT_DISABLE_DHT -DWITH_SHIPPED_GEOIP_H -DBOOST_ASIO_HASH_MAP_BUCKETS=1021 -DBOOST_EXCEPTION_DISABLE -DBOOST_ASIO_ENABLE_CANCELIO -DBOOST_ASIO_DYN_LINK -DTORRENT_LINKING_SHARED)
+vespa_add_target_system_dependency(filedistribution_filedistributor_app boost boost_system-mt-d)
+vespa_add_target_system_dependency(filedistribution_filedistributor_app boost boost_thread-mt-d)
+vespa_add_target_system_dependency(filedistribution_filedistributor_app boost boost_program_options-mt-d)
+vespa_add_target_system_dependency(filedistribution_filedistributor_app boost boost_filesystem-mt-d)
+vespa_add_target_system_dependency(filedistribution_filedistributor_app boost boost_unit_test_framework-mt-d)
diff --git a/filedistribution/src/apps/filedistributor/filedistributor.cpp b/filedistribution/src/apps/filedistributor/filedistributor.cpp
new file mode 100644
index 00000000000..d284385784d
--- /dev/null
+++ b/filedistribution/src/apps/filedistributor/filedistributor.cpp
@@ -0,0 +1,401 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/fastos/fastos.h>
+#include <iostream>
+#include <string>
+#include <cstdlib>
+
+#include <boost/program_options.hpp>
+#include <boost/lambda/bind.hpp>
+#include <boost/lambda/lambda.hpp>
+#include <boost/thread/thread.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/noncopyable.hpp>
+#include <boost/date_time/posix_time/posix_time_types.hpp>
+#include <boost/exception/diagnostic_information.hpp>
+#include <boost/scope_exit.hpp>
+
+#include <vespa/fastos/app.h>
+#include <vespa/config-zookeepers.h>
+
+#include <vespa/filedistribution/rpcconfig/config-filedistributorrpc.h>
+#include <vespa/filedistribution/distributor/config-filedistributor.h>
+#include <vespa/filedistribution/model/config-filereferences.h>
+
+#include <vespa/filedistribution/distributor/filedistributortrackerimpl.h>
+#include <vespa/filedistribution/distributor/filedownloadermanager.h>
+#include <vespa/filedistribution/distributor/filedownloader.h>
+#include <vespa/filedistribution/distributor/signalhandling.h>
+#include <vespa/filedistribution/distributor/state_server_impl.h>
+
+#include <vespa/filedistribution/model/filedistributionmodelimpl.h>
+#include <vespa/filedistribution/rpc/filedistributorrpc.h>
+#include <vespa/filedistribution/common/exception.h>
+#include <vespa/filedistribution/common/componentsdeleter.h>
+
+namespace {
+const char* programName = "filedistributor";
+}
+
+#include <vespa/log/log.h>
+LOG_SETUP(programName);
+
+namespace ll = boost::lambda;
+
+using namespace filedistribution;
+using cloud::config::ZookeepersConfig;
+using cloud::config::filedistribution::FiledistributorConfig;
+using cloud::config::filedistribution::FiledistributorrpcConfig;
+
+class FileDistributor : public config::IFetcherCallback<ZookeepersConfig>,
+ public config::IFetcherCallback<FiledistributorConfig>,
+ public config::IFetcherCallback<FiledistributorrpcConfig>,
+ public config::IGenerationCallback,
+ boost::noncopyable
+{
+ class Components : boost::noncopyable {
+ ComponentsDeleter _componentsDeleter;
+ public:
+ const boost::shared_ptr<ZKFacade> _zk;
+ const boost::shared_ptr<FileDistributionModelImpl> _model;
+ const boost::shared_ptr<FileDistributorTrackerImpl> _tracker;
+ const boost::shared_ptr<FileDownloader> _downloader;
+ const boost::shared_ptr<FileDownloaderManager> _manager;
+ const boost::shared_ptr<FileDistributorRPC> _rpcHandler;
+ const boost::shared_ptr<StateServerImpl> _stateServer;
+
+ private:
+ boost::thread _downloaderEventLoopThread;
+ config::ConfigFetcher _configFetcher;
+
+
+ template <class T>
+ typename boost::shared_ptr<T> track(T* component) {
+ return _componentsDeleter.track(component);
+ }
+
+ public:
+
+ Components(const boost::shared_ptr<ExceptionRethrower>& exceptionRethrower,
+ const config::ConfigUri & configUri,
+ const ZookeepersConfig& zooKeepersConfig,
+ const FiledistributorConfig& fileDistributorConfig,
+ const FiledistributorrpcConfig& rpcConfig)
+ :_zk(track(new ZKFacade(zooKeepersConfig.zookeeperserverlist, exceptionRethrower))),
+ _model(track(new FileDistributionModelImpl(
+ fileDistributorConfig.hostname,
+ fileDistributorConfig.torrentport,
+ _zk,
+ exceptionRethrower))),
+ _tracker(track(new FileDistributorTrackerImpl(_model, exceptionRethrower))),
+ _downloader(track(new FileDownloader(
+ _tracker,
+ fileDistributorConfig.hostname,
+ fileDistributorConfig.torrentport,
+ boost::filesystem::path(fileDistributorConfig.filedbpath),
+ exceptionRethrower))),
+ _manager(track(new FileDownloaderManager(_downloader, _model))),
+ _rpcHandler(track(new FileDistributorRPC(rpcConfig.connectionspec, _manager))),
+ _stateServer(track(new StateServerImpl(fileDistributorConfig.stateport))),
+ _downloaderEventLoopThread(
+ ll::bind(&FileDownloader::runEventLoop, _downloader.get())),
+ _configFetcher(configUri.getContext())
+
+ {
+ _manager->start();
+ _rpcHandler->start();
+
+ _tracker->setDownloader(_downloader);
+ _configFetcher.subscribe<FilereferencesConfig>(configUri.getConfigId(), _model.get());
+ _configFetcher.start();
+ updatedConfig(_configFetcher.getGeneration());
+ }
+
+ void updatedConfig(int64_t generation) {
+ vespalib::ComponentConfigProducer::Config curr("filedistributor", generation);
+ _stateServer->myComponents.addConfig(curr);
+ }
+
+ ~Components() {
+ _configFetcher.close();
+ //Do not waste time retrying zookeeper operations when going down.
+ _zk->disableRetries();
+
+ _downloaderEventLoopThread.interrupt();
+ _downloaderEventLoopThread.join();
+ }
+
+ };
+
+ typedef boost::lock_guard<boost::mutex> LockGuard;
+ boost::mutex _configMutex;
+
+ bool _completeReconfigurationNeeded;
+ std::unique_ptr<ZookeepersConfig> _zooKeepersConfig;
+ std::unique_ptr<FiledistributorConfig> _fileDistributorConfig;
+ std::unique_ptr<FiledistributorrpcConfig> _rpcConfig;
+
+ boost::shared_ptr<ExceptionRethrower> _exceptionRethrower;
+ std::unique_ptr<Components> _components;
+public:
+ FileDistributor()
+ : _configMutex(),
+ _completeReconfigurationNeeded(false),
+ _zooKeepersConfig(),
+ _fileDistributorConfig(),
+ _rpcConfig(),
+ _exceptionRethrower(),
+ _components()
+ { }
+
+ void notifyGenerationChange(int64_t generation) {
+ if (_components && ! completeReconfigurationNeeded()) {
+ _components->updatedConfig(generation);
+ }
+ }
+
+ //configure overrides
+ void configure(std::unique_ptr<ZookeepersConfig> config) {
+ LockGuard guard(_configMutex);
+ _zooKeepersConfig = std::move(config);
+ _completeReconfigurationNeeded = true;
+ }
+
+ void configure(std::unique_ptr<FiledistributorConfig> config) {
+ LockGuard guard(_configMutex);
+ if (_fileDistributorConfig.get() != NULL &&
+ (config->torrentport != _fileDistributorConfig->torrentport ||
+ config->stateport != _fileDistributorConfig->stateport ||
+ config->hostname != _fileDistributorConfig->hostname ||
+ config->filedbpath != _fileDistributorConfig->filedbpath))
+ {
+ _completeReconfigurationNeeded = true;
+ } else if (_components.get()) {
+ configureSpeedLimits(*config);
+ }
+ _fileDistributorConfig = std::move(config);
+
+ }
+
+ void configure(std::unique_ptr<FiledistributorrpcConfig> config) {
+ LockGuard guard(_configMutex);
+ _rpcConfig = std::move(config);
+ _completeReconfigurationNeeded = true;
+ }
+
+ void run(const config::ConfigUri & configUri) {
+ while (!askedToShutDown()) {
+ clearReinitializeFlag();
+ _exceptionRethrower.reset(new ExceptionRethrower());
+ runImpl(configUri);
+
+ if (_exceptionRethrower->exceptionStored())
+ _exceptionRethrower->rethrow();
+ }
+ }
+
+ static void ensureExceptionsStored(const boost::shared_ptr<ExceptionRethrower>& exceptionRethrower) {
+ //TODO: this is somewhat hackish, refactor to eliminate this later.
+ LOG(debug, "Waiting for shutdown");
+ for (int i=0;
+ i<50 && !exceptionRethrower.unique();
+ ++i) {
+ boost::this_thread::sleep(boost::posix_time::milliseconds(100));
+ }
+ LOG(debug, "Done waiting for shutdown");
+
+ if (!exceptionRethrower.unique()) {
+ EV_STOPPING(programName, "Forced termination");
+ kill(getpid(), SIGKILL);
+ }
+ }
+
+ void createComponents(const boost::shared_ptr<ExceptionRethrower>& exceptionRethrower, const config::ConfigUri & configUri) {
+ LockGuard guard(_configMutex);
+ _components.reset(
+ new Components(exceptionRethrower,
+ configUri,
+ *_zooKeepersConfig,
+ *_fileDistributorConfig,
+ *_rpcConfig));
+
+ configureSpeedLimits(*_fileDistributorConfig);
+ _completeReconfigurationNeeded = false;
+ }
+
+ bool completeReconfigurationNeeded() {
+ LockGuard guard(_configMutex);
+ if (_completeReconfigurationNeeded) {
+ LOG(debug, "Complete reconfiguration needed");
+ }
+ return _completeReconfigurationNeeded;
+ }
+
+ void configureSpeedLimits(const FiledistributorConfig& config) {
+ FileDownloader& downloader = *_components->_downloader;
+ downloader.setMaxDownloadSpeed(config.maxdownloadspeed);
+ downloader.setMaxUploadSpeed(config.maxuploadspeed);
+ }
+
+ //avoid warning due to scope exit macro
+#pragma GCC diagnostic ignored "-Wshadow"
+ void runImpl(const config::ConfigUri & configUri) {
+
+ BOOST_SCOPE_EXIT((&_components)(&_exceptionRethrower)) {
+ _components.reset();
+ //Ensures that any exception stored during destruction will be available when returning.
+ ensureExceptionsStored(_exceptionRethrower);
+ } BOOST_SCOPE_EXIT_END
+
+ createComponents(_exceptionRethrower, configUri);
+
+ // We do not want back to back reinitializing as it gives zero time for serving
+ // some torrents.
+ int postPoneAskedToReinitializedSecs = 50;
+
+ while (!askedToShutDown() &&
+ (postPoneAskedToReinitializedSecs > 0 || !askedToReinitialize()) &&
+ !completeReconfigurationNeeded() &&
+ !_exceptionRethrower->exceptionStored()) {
+ postPoneAskedToReinitializedSecs--;
+ boost::this_thread::sleep(boost::posix_time::seconds(1));
+ }
+ }
+};
+
+//TODO: use pop in gcc 4.6
+#pragma GCC diagnostic warning "-Wshadow"
+
+class FileDistributorApplication : public FastOS_Application {
+ const config::ConfigUri _configUri;
+public:
+ FileDistributorApplication(const config::ConfigUri & configUri);
+
+ //overrides
+ int Main();
+};
+
+namespace {
+
+struct ProgramOptionException {
+ std::string _msg;
+ ProgramOptionException(const std::string & msg)
+ : _msg(msg)
+ {}
+};
+
+bool exists(const std::string& optionName, const boost::program_options::variables_map& map) {
+ return map.find(optionName) != map.end();
+}
+
+void ensureExists(const std::string& optionName, const boost::program_options::variables_map& map \
+ ) {
+ if (!exists(optionName, map)) {
+ throw ProgramOptionException("Error: Missing option " + optionName);
+ }
+}
+
+} //anonymous namespace
+
+FileDistributorApplication::FileDistributorApplication(const config::ConfigUri & configUri)
+ :_configUri(configUri) {
+}
+
+int
+FileDistributorApplication::Main() {
+ try {
+ FileDistributor distributor;
+
+ config::ConfigFetcher configFetcher(_configUri.getContext());
+ configFetcher.subscribe<ZookeepersConfig>(_configUri.getConfigId(), &distributor);
+ configFetcher.subscribe<FiledistributorConfig>(_configUri.getConfigId(), &distributor);
+ configFetcher.subscribe<FiledistributorrpcConfig>(_configUri.getConfigId(), &distributor);
+ configFetcher.subscribeGenerationChanges(&distributor);
+ configFetcher.start();
+
+ distributor.run(_configUri);
+
+ EV_STOPPING(programName, "Clean exit");
+ return 0;
+ } catch(const FileDoesNotExistException & e) {
+ std::string s = boost::diagnostic_information(e);
+ EV_STOPPING(programName, s.c_str());
+ return -1;
+ } catch(const ZKNodeDoesNotExistsException & e) {
+ std::string s = boost::diagnostic_information(e);
+ EV_STOPPING(programName, s.c_str());
+ return -2;
+ } catch(const ZKGenericException & e) {
+ std::string s = boost::diagnostic_information(e);
+ EV_STOPPING(programName, s.c_str());
+ return -3;
+ } catch(const boost::unknown_exception & e) {
+ std::string s = boost::diagnostic_information(e);
+ LOG(warning, "Caught '%s'", s.c_str());
+ EV_STOPPING(programName, s.c_str());
+ return -4;
+#if 0
+ } catch(const boost::exception& e) {
+ std::string s = boost::diagnostic_information(e);
+ LOG(error, "Caught '%s'", s.c_str());
+ EV_STOPPING(programName, s.c_str());
+ return -1;
+ } catch(const std::string& msg) {
+ std::string s = "Error: " + msg;
+ LOG(error, "Caught '%s'", s.c_str());
+ EV_STOPPING(programName, s.c_str());
+ return -1;
+#endif
+ }
+}
+
+int
+executeApplication(int argc, char** argv) {
+ const char
+ *configId("configid"),
+ *help("help");
+
+ namespace po = boost::program_options;
+ po::options_description description;
+ description.add_options()
+ (configId, po::value<std::string > (), "id to request config for")
+ (help, "help");
+
+ try {
+ po::variables_map values;
+ po::store(
+ po::parse_command_line(argc, argv, description),
+ values);
+
+ if (exists(help, values)) {
+ std::cout <<description;
+ return 0;
+ }
+ ensureExists(configId, values);
+
+ FileDistributorApplication application(
+ values[configId].as<std::string > ());
+ return application.Entry(argc, argv);
+
+ } catch(ProgramOptionException& e) {
+ std::cerr <<e._msg <<std::endl;
+ return -1;
+ }
+}
+
+void terminate() {
+ std::cerr <<"Terminate called: " <<std::endl;
+ Backtrace backtrace;
+ std::cerr <<backtrace <<std::endl;
+}
+
+int
+main(int argc, char** argv) {
+ EV_STARTED(programName);
+ initSignals();
+
+ std::srand(std::time(0));
+ std::set_terminate(terminate);
+ filedistribution::setupZooKeeperLogging();
+
+ return executeApplication(argc, argv);
+}
diff --git a/filedistribution/src/apps/status/.gitignore b/filedistribution/src/apps/status/.gitignore
new file mode 100644
index 00000000000..6dc1c1fff5d
--- /dev/null
+++ b/filedistribution/src/apps/status/.gitignore
@@ -0,0 +1,2 @@
+/status-filedistribution
+filedistribution_status-filedistribution_app
diff --git a/filedistribution/src/apps/status/CMakeLists.txt b/filedistribution/src/apps/status/CMakeLists.txt
new file mode 100644
index 00000000000..fb0b76af55b
--- /dev/null
+++ b/filedistribution/src/apps/status/CMakeLists.txt
@@ -0,0 +1,16 @@
+# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_executable(filedistribution_status-filedistribution_app
+ SOURCES
+ status-filedistribution.cpp
+ INSTALL bin
+ DEPENDS
+ filedistribution_filedistributionmodel
+ filedistribution_common
+)
+target_compile_options(filedistribution_status-filedistribution_app PRIVATE -DTORRENT_DEBUG -DTORRENT_DISABLE_ENCRYPTION -DTORRENT_DISABLE_DHT -DWITH_SHIPPED_GEOIP_H -DBOOST_ASIO_HASH_MAP_BUCKETS=1021 -DBOOST_EXCEPTION_DISABLE -DBOOST_ASIO_ENABLE_CANCELIO -DBOOST_ASIO_DYN_LINK -DTORRENT_LINKING_SHARED)
+vespa_add_target_system_dependency(filedistribution_status-filedistribution_app boost boost_system-mt-d)
+vespa_add_target_system_dependency(filedistribution_status-filedistribution_app boost boost_thread-mt-d)
+vespa_add_target_system_dependency(filedistribution_status-filedistribution_app boost boost_program_options-mt-d)
+vespa_add_target_system_dependency(filedistribution_status-filedistribution_app boost boost_filesystem-mt-d)
+vespa_add_target_system_dependency(filedistribution_status-filedistribution_app boost boost_unit_test_framework-mt-d)
+vespa_install_script(vespa-status-filedistribution.sh vespa-status-filedistribution bin)
diff --git a/filedistribution/src/apps/status/status-filedistribution.cpp b/filedistribution/src/apps/status/status-filedistribution.cpp
new file mode 100644
index 00000000000..90c21623016
--- /dev/null
+++ b/filedistribution/src/apps/status/status-filedistribution.cpp
@@ -0,0 +1,185 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/fastos/fastos.h>
+#include <vespa/log/log.h>
+LOG_SETUP("status-filedistribution");
+
+#include <iostream>
+#include <map>
+
+#include <boost/program_options.hpp>
+#include <boost/foreach.hpp>
+#include <boost/thread.hpp>
+
+#include <vespa/filedistribution/common/exceptionrethrower.h>
+#include <vespa/filedistribution/model/zkfacade.h>
+#include <vespa/filedistribution/model/filedistributionmodel.h>
+#include <vespa/filedistribution/model/filedistributionmodelimpl.h>
+#include <zookeeper/zookeeper.h>
+
+using namespace filedistribution;
+
+std::string
+plural(size_t size)
+{
+ if (size == 1)
+ return "";
+ else
+ return "s";
+}
+
+template <class CONT>
+std::string
+plural(const CONT& cont)
+{
+ size_t size = cont.size();
+ return plural(size);
+}
+
+typedef FileDBModel::HostStatus HostStatus;
+typedef std::map<std::string, HostStatus> StatusByHostName;
+
+void
+printWaitingForHosts(const StatusByHostName& notFinishedHosts)
+{
+ std::cout <<"Waiting for the following host" <<plural(notFinishedHosts) <<":" <<std::endl;
+ BOOST_FOREACH(const StatusByHostName::value_type hostNameAndStatus, notFinishedHosts) {
+ std::cout <<hostNameAndStatus.first <<" (";
+
+ const HostStatus& hostStatus = hostNameAndStatus.second;
+ if (hostStatus._state == HostStatus::notStarted) {
+ std::cout <<"Not started";
+ } else {
+ std::cout <<"Downloading, "
+ <<hostStatus._numFilesFinished <<"/" <<hostStatus._numFilesToDownload
+ <<" file" <<plural(hostStatus._numFilesToDownload) <<" completed";
+ }
+ std::cout <<")" <<std::endl;
+ }
+}
+
+//TODO:refactor
+int printStatus(const std::string& zkservers)
+{
+ boost::shared_ptr<ExceptionRethrower> exceptionRethrower;
+ boost::shared_ptr<ZKFacade> zk(new ZKFacade(zkservers, exceptionRethrower));
+
+ boost::shared_ptr<FileDBModel> model(new ZKFileDBModel(zk));
+
+ std::vector<std::string> hosts = model->getHosts();
+
+ StatusByHostName notFinishedHosts;
+ StatusByHostName finishedHosts;
+ bool hasStarted = false;
+
+ BOOST_FOREACH(std::string host, hosts) {
+ HostStatus hostStatus = model->getHostStatus(host);
+ switch (hostStatus._state) {
+ case HostStatus::finished:
+ hasStarted = true;
+ finishedHosts[host] = hostStatus;
+ break;
+ case HostStatus::inProgress:
+ hasStarted = true;
+ case HostStatus::notStarted:
+ notFinishedHosts[host] = hostStatus;
+ break;
+ }
+ }
+
+ if (notFinishedHosts.empty()) {
+ std::cout <<"Finished distributing files to all hosts." <<std::endl;
+ return 0;
+ } else if (!hasStarted) {
+ std::cout <<"File distribution has not yet started." <<std::endl;
+ return 0;
+ } else {
+ printWaitingForHosts(notFinishedHosts);
+ return 5;
+ }
+}
+
+int
+printStatusRetryIfZKProblem(const std::string& zkservers, const std::string& zkLogFile)
+{
+ FILE* file = fopen(zkLogFile.c_str(), "w");
+ if (file == NULL) {
+ std::cerr <<"Could not open file " <<zkLogFile <<std::endl;
+ } else {
+ zoo_set_log_stream(file);
+ }
+ zoo_set_debug_level(ZOO_LOG_LEVEL_ERROR);
+
+ for (int i = 0; i < 5; ++i) {
+ try {
+ return printStatus(zkservers);
+ } catch (ZKNodeDoesNotExistsException& e) {
+ LOG(debug, "Node does not exists, assuming concurrent update. %s", boost::diagnostic_information(e).c_str());
+
+ } catch (ZKSessionExpired& e) {
+ LOG(debug, "Session expired.");
+ }
+ boost::this_thread::sleep(boost::posix_time::milliseconds(500));
+ }
+ return 4;
+}
+
+
+//TODO: move to common
+struct ProgramOptionException {
+ std::string _msg;
+ ProgramOptionException(const std::string & msg)
+ : _msg(msg)
+ {}
+};
+
+bool exists(const std::string& optionName, const boost::program_options::variables_map& map) {
+ return map.find(optionName) != map.end();
+}
+
+void ensureExists(const std::string& optionName, const boost::program_options::variables_map& map \
+ ) {
+ if (!exists(optionName, map)) {
+ throw ProgramOptionException("Error: Missing option " + optionName);
+ }
+}
+//END: move to common
+
+
+int
+main(int argc, char** argv) {
+ const char
+ *zkstring = "zkstring",
+ *zkLogFile = "zkLogFile",
+ *help = "help";
+
+ namespace po = boost::program_options;
+ boost::program_options::options_description description;
+ description.add_options()
+ (zkstring, po::value<std::string > (), "The zookeeper servers to connect to, separated by comma")
+ (zkLogFile, po::value<std::string >() -> default_value("/dev/null"), "Zookeeper log file")
+ (help, "help");
+
+ try {
+ boost::program_options::variables_map values;
+ po::store(
+ boost::program_options::parse_command_line(argc, argv, description),
+ values);
+
+ if (exists(help, values)) {
+ std::cout <<description;
+ return 0;
+ }
+
+ ensureExists(zkstring, values);
+
+ return printStatusRetryIfZKProblem(
+ values[zkstring] .as<std::string>(),
+ values[zkLogFile].as<std::string>());
+ } catch (const ProgramOptionException& e) {
+ std::cerr <<e._msg <<std::endl;
+ return 3;
+ } catch(const std::exception& e) {
+ std::cerr <<"Error: " <<e.what() <<std::endl;
+ return 3;
+ }
+}
diff --git a/filedistribution/src/apps/status/vespa-status-filedistribution.sh b/filedistribution/src/apps/status/vespa-status-filedistribution.sh
new file mode 100644
index 00000000000..477e4f6bff5
--- /dev/null
+++ b/filedistribution/src/apps/status/vespa-status-filedistribution.sh
@@ -0,0 +1,68 @@
+#!/bin/sh
+# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+# BEGIN environment bootstrap section
+# Do not edit between here and END as this section should stay identical in all scripts
+
+findpath () {
+ myname=${0}
+ mypath=${myname%/*}
+ myname=${myname##*/}
+ if [ "$mypath" ] && [ -d "$mypath" ]; then
+ return
+ fi
+ mypath=$(pwd)
+ if [ -f "${mypath}/${myname}" ]; then
+ return
+ fi
+ echo "FATAL: Could not figure out the path where $myname lives from $0"
+ exit 1
+}
+
+COMMON_ENV=libexec/vespa/common-env.sh
+
+source_common_env () {
+ if [ "$VESPA_HOME" ] && [ -d "$VESPA_HOME" ]; then
+ # ensure it ends with "/" :
+ VESPA_HOME=${VESPA_HOME%/}/
+ export VESPA_HOME
+ common_env=$VESPA_HOME/$COMMON_ENV
+ if [ -f "$common_env" ]; then
+ . $common_env
+ return
+ fi
+ fi
+ return 1
+}
+
+findroot () {
+ source_common_env && return
+ if [ "$VESPA_HOME" ]; then
+ echo "FATAL: bad VESPA_HOME value '$VESPA_HOME'"
+ exit 1
+ fi
+ if [ "$ROOT" ] && [ -d "$ROOT" ]; then
+ VESPA_HOME="$ROOT"
+ source_common_env && return
+ fi
+ findpath
+ while [ "$mypath" ]; do
+ VESPA_HOME=${mypath}
+ source_common_env && return
+ mypath=${mypath%/*}
+ done
+ echo "FATAL: missing VESPA_HOME environment variable"
+ echo "Could not locate $COMMON_ENV anywhere"
+ exit 1
+}
+
+findroot
+
+# END environment bootstrap section
+
+ROOT=$VESPA_HOME
+
+ZKSTRING=$($ROOT/libexec/vespa/vespa-config.pl -zkstring)
+test -z "$VESPA_LOG_LEVEL" && VESPA_LOG_LEVEL=warning
+export VESPA_LOG_LEVEL
+exec $ROOT/libexec/vespa/status-filedistribution --zkstring "$ZKSTRING" $@
diff --git a/filedistribution/src/tests/.gitignore b/filedistribution/src/tests/.gitignore
new file mode 100644
index 00000000000..ee50e13466c
--- /dev/null
+++ b/filedistribution/src/tests/.gitignore
@@ -0,0 +1,2 @@
+Makefile
+*_test
diff --git a/filedistribution/src/tests/common/.gitignore b/filedistribution/src/tests/common/.gitignore
new file mode 100644
index 00000000000..060721ea295
--- /dev/null
+++ b/filedistribution/src/tests/common/.gitignore
@@ -0,0 +1 @@
+filedistribution_common_test_app
diff --git a/filedistribution/src/tests/common/CMakeLists.txt b/filedistribution/src/tests/common/CMakeLists.txt
new file mode 100644
index 00000000000..b9670848fbc
--- /dev/null
+++ b/filedistribution/src/tests/common/CMakeLists.txt
@@ -0,0 +1,13 @@
+# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_executable(filedistribution_common_test_app
+ SOURCES
+ testCommon.cpp
+ DEPENDS
+)
+target_compile_options(filedistribution_common_test_app PRIVATE -DTORRENT_DEBUG -DTORRENT_DISABLE_ENCRYPTION -DTORRENT_DISABLE_DHT -DWITH_SHIPPED_GEOIP_H -DBOOST_ASIO_HASH_MAP_BUCKETS=1021 -DBOOST_EXCEPTION_DISABLE -DBOOST_ASIO_ENABLE_CANCELIO -DBOOST_ASIO_DYN_LINK -DTORRENT_LINKING_SHARED)
+vespa_add_target_system_dependency(filedistribution_common_test_app boost boost_system-mt-d)
+vespa_add_target_system_dependency(filedistribution_common_test_app boost boost_thread-mt-d)
+vespa_add_target_system_dependency(filedistribution_common_test_app boost boost_program_options-mt-d)
+vespa_add_target_system_dependency(filedistribution_common_test_app boost boost_filesystem-mt-d)
+vespa_add_target_system_dependency(filedistribution_common_test_app boost boost_unit_test_framework-mt-d)
+vespa_add_test(NAME filedistribution_common_test_app NO_VALGRIND COMMAND filedistribution_common_test_app)
diff --git a/filedistribution/src/tests/common/testCommon.cpp b/filedistribution/src/tests/common/testCommon.cpp
new file mode 100644
index 00000000000..699d3628547
--- /dev/null
+++ b/filedistribution/src/tests/common/testCommon.cpp
@@ -0,0 +1,42 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#define BOOST_TEST_DYN_LINK
+#define BOOST_TEST_MAIN
+
+#include <vespa/fastos/fastos.h>
+#include <vespa/filedistribution/common/buffer.h>
+
+#include <boost/test/unit_test.hpp>
+#include <string>
+
+namespace fd = filedistribution;
+
+const size_t bufferCapacity = 10;
+
+fd::Move<fd::Buffer>
+getBuffer() {
+ const char* test = "test";
+ fd::Buffer buffer(test, test + strlen(test));
+ buffer.reserve(bufferCapacity);
+ buffer.push_back(0);
+ return fd::move(buffer);
+}
+
+BOOST_AUTO_TEST_CASE(bufferTest) {
+ fd::Buffer buffer(getBuffer());
+ BOOST_CHECK(buffer.begin() != 0);
+ BOOST_CHECK_EQUAL(bufferCapacity, buffer.capacity());
+ BOOST_CHECK_EQUAL(5u, buffer.size());
+ BOOST_CHECK_EQUAL(std::string("test"), buffer.begin());
+}
+
+struct Callback {
+ bool* _called;
+ Callback(bool *called)
+ :_called(called)
+ {}
+
+ void operator()(const std::string& str) {
+ BOOST_CHECK_EQUAL("abcd", str);
+ *_called = true;
+ }
+};
diff --git a/filedistribution/src/tests/filedbmodelimpl/.gitignore b/filedistribution/src/tests/filedbmodelimpl/.gitignore
new file mode 100644
index 00000000000..05a25df1258
--- /dev/null
+++ b/filedistribution/src/tests/filedbmodelimpl/.gitignore
@@ -0,0 +1 @@
+filedistribution_filedbmodelimpl_test_app
diff --git a/filedistribution/src/tests/filedbmodelimpl/CMakeLists.txt b/filedistribution/src/tests/filedbmodelimpl/CMakeLists.txt
new file mode 100644
index 00000000000..9bd7bd14ebb
--- /dev/null
+++ b/filedistribution/src/tests/filedbmodelimpl/CMakeLists.txt
@@ -0,0 +1,16 @@
+# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_executable(filedistribution_filedbmodelimpl_test_app
+ SOURCES
+ test-filedistributionmodelimpl.cpp
+ DEPENDS
+ filedistribution_filedistributionmodel
+ filedistribution_common
+ filedistribution_mocks
+)
+target_compile_options(filedistribution_filedbmodelimpl_test_app PRIVATE -DTORRENT_DEBUG -DTORRENT_DISABLE_ENCRYPTION -DTORRENT_DISABLE_DHT -DWITH_SHIPPED_GEOIP_H -DBOOST_ASIO_HASH_MAP_BUCKETS=1021 -DBOOST_EXCEPTION_DISABLE -DBOOST_ASIO_ENABLE_CANCELIO -DBOOST_ASIO_DYN_LINK -DTORRENT_LINKING_SHARED)
+vespa_add_target_system_dependency(filedistribution_filedbmodelimpl_test_app boost boost_system-mt-d)
+vespa_add_target_system_dependency(filedistribution_filedbmodelimpl_test_app boost boost_thread-mt-d)
+vespa_add_target_system_dependency(filedistribution_filedbmodelimpl_test_app boost boost_program_options-mt-d)
+vespa_add_target_system_dependency(filedistribution_filedbmodelimpl_test_app boost boost_filesystem-mt-d)
+vespa_add_target_system_dependency(filedistribution_filedbmodelimpl_test_app boost boost_unit_test_framework-mt-d)
+vespa_add_test(NAME filedistribution_filedbmodelimpl_test_app NO_VALGRIND COMMAND filedistribution_filedbmodelimpl_test_app)
diff --git a/filedistribution/src/tests/filedbmodelimpl/test-filedistributionmodelimpl.cpp b/filedistribution/src/tests/filedbmodelimpl/test-filedistributionmodelimpl.cpp
new file mode 100644
index 00000000000..84d1b5b958c
--- /dev/null
+++ b/filedistribution/src/tests/filedbmodelimpl/test-filedistributionmodelimpl.cpp
@@ -0,0 +1,54 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#define BOOST_TEST_DYN_LINK
+#define BOOST_TEST_MAIN
+#define BOOST_TEST_MODULE filedbmodelimpl test
+#include <vespa/fastos/fastos.h>
+#include <boost/test/unit_test.hpp>
+
+#include <iostream>
+#include <vector>
+#include <vespa/filedistribution/common/componentsdeleter.h>
+#include <vespa/filedistribution/model/filedistributionmodelimpl.h>
+#include <vespa/filedistribution/model/zkfacade.h>
+#include <zookeeper/zookeeper.h>
+
+
+using namespace filedistribution;
+
+
+namespace {
+
+
+struct Fixture {
+ boost::shared_ptr<ExceptionRethrower> _exceptionRethrower;
+ ComponentsDeleter _componentsDeleter;
+ boost::shared_ptr<ZKFacade> _zk;
+ boost::shared_ptr<FileDistributionModelImpl> _distModel;
+ Fixture() {
+ _exceptionRethrower.reset(new ExceptionRethrower());
+ _zk = _componentsDeleter.track(new ZKFacade("test1-tonyv:2181", _exceptionRethrower));
+ _distModel.reset(new FileDistributionModelImpl("hostname", 12345, _zk, _exceptionRethrower));
+ }
+ ~Fixture() { }
+};
+
+} //anonymous namespace
+
+
+BOOST_FIXTURE_TEST_SUITE(FileDistributionModelImplTests, Fixture)
+
+BOOST_AUTO_TEST_CASE(configServersAsPeers)
+{
+ std::vector<std::string> peers;
+ peers.push_back("old");
+ peers.push_back("config:123");
+ peers.push_back("config:567");
+ peers.push_back("foo:123");
+ _distModel->addConfigServersAsPeers(peers, "config,configTwo", 123);
+ BOOST_CHECK(peers.size() == 5);
+ BOOST_CHECK(peers[4] == "configTwo:123");
+ _distModel->addConfigServersAsPeers(peers, NULL, 123);
+ BOOST_CHECK(peers.size() == 5);
+}
+
+BOOST_AUTO_TEST_SUITE_END()
diff --git a/filedistribution/src/tests/filedownloader/.gitignore b/filedistribution/src/tests/filedownloader/.gitignore
new file mode 100644
index 00000000000..f7bb72b4791
--- /dev/null
+++ b/filedistribution/src/tests/filedownloader/.gitignore
@@ -0,0 +1 @@
+filedistribution_filedownloader_test_app
diff --git a/filedistribution/src/tests/filedownloader/CMakeLists.txt b/filedistribution/src/tests/filedownloader/CMakeLists.txt
new file mode 100644
index 00000000000..6be9b28651d
--- /dev/null
+++ b/filedistribution/src/tests/filedownloader/CMakeLists.txt
@@ -0,0 +1,15 @@
+# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_executable(filedistribution_filedownloader_test_app
+ SOURCES
+ testfiledownloader.cpp
+ DEPENDS
+ filedistribution_filedistributionmodel
+ filedistribution_common
+)
+target_compile_options(filedistribution_filedownloader_test_app PRIVATE -DTORRENT_DEBUG -DTORRENT_DISABLE_ENCRYPTION -DTORRENT_DISABLE_DHT -DWITH_SHIPPED_GEOIP_H -DBOOST_ASIO_HASH_MAP_BUCKETS=1021 -DBOOST_EXCEPTION_DISABLE -DBOOST_ASIO_ENABLE_CANCELIO -DBOOST_ASIO_DYN_LINK -DTORRENT_LINKING_SHARED)
+vespa_add_target_system_dependency(filedistribution_filedownloader_test_app boost boost_system-mt-d)
+vespa_add_target_system_dependency(filedistribution_filedownloader_test_app boost boost_thread-mt-d)
+vespa_add_target_system_dependency(filedistribution_filedownloader_test_app boost boost_program_options-mt-d)
+vespa_add_target_system_dependency(filedistribution_filedownloader_test_app boost boost_filesystem-mt-d)
+vespa_add_target_system_dependency(filedistribution_filedownloader_test_app boost boost_unit_test_framework-mt-d)
+vespa_add_test(NAME filedistribution_filedownloader_test_app NO_VALGRIND COMMAND filedistribution_filedownloader_test_app)
diff --git a/filedistribution/src/tests/filedownloader/testfiledownloader.cpp b/filedistribution/src/tests/filedownloader/testfiledownloader.cpp
new file mode 100644
index 00000000000..94b505f4f9f
--- /dev/null
+++ b/filedistribution/src/tests/filedownloader/testfiledownloader.cpp
@@ -0,0 +1,158 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#define BOOST_TEST_DYN_LINK
+#define BOOST_TEST_MAIN
+
+#include <vespa/fastos/fastos.h>
+#include <vespa/filedistribution/distributor/filedownloader.h>
+#include <vespa/filedistribution/distributor/filedistributortrackerimpl.h>
+
+#include <fstream>
+
+#include <boost/test/unit_test.hpp>
+#include <boost/filesystem.hpp>
+#include <boost/thread.hpp>
+#include <boost/lambda/bind.hpp>
+#include <boost/filesystem/fstream.hpp>
+
+#include <libtorrent/session.hpp>
+#include <libtorrent/tracker_manager.hpp>
+#include <libtorrent/torrent.hpp>
+
+#include <vespa/filedistribution/manager/createtorrent.h>
+#include <vespa/filedistribution/model/filedistributionmodel.h>
+#include <vespa/filedistribution/common/exceptionrethrower.h>
+#include <vespa/filedistribution/common/componentsdeleter.h>
+
+namespace fs = boost::filesystem;
+
+using namespace filedistribution;
+
+namespace {
+const std::string localHost("localhost");
+const int uploaderPort = 9113;
+const int downloaderPort = 9112;
+
+#if 0
+boost::shared_ptr<FileDownloader>
+createDownloader(ComponentsDeleter& deleter,
+ int port, const fs::path& downloaderPath,
+ const boost::shared_ptr<FileDistributionModel>& model,
+ const boost::shared_ptr<ExceptionRethrower>& exceptionRethrower)
+{
+ boost::shared_ptr<FileDistributorTrackerImpl> tracker(deleter.track(new FileDistributorTrackerImpl(model, exceptionRethrower)));
+ boost::shared_ptr<FileDownloader> downloader(deleter.track(new FileDownloader(tracker,
+ localHost, port, downloaderPath, exceptionRethrower)));
+
+ tracker->setDownloader(downloader);
+ return downloader;
+}
+#endif
+
+} //anonymous namespace
+
+class MockFileDistributionModel : public FileDistributionModel {
+ virtual FileDBModel& getFileDBModel() {
+ abort();
+ }
+
+ virtual std::set<std::string> getFilesToDownload() {
+ return std::set<std::string>();
+ }
+
+ virtual PeerEntries getPeers(const std::string& , size_t) {
+ PeerEntries peers(2);
+ peers[0].ip = localHost;
+ peers[0].port = uploaderPort;
+
+ peers[1].ip = localHost;
+ peers[1].port = downloaderPort;
+
+ return peers;
+ }
+
+ virtual void addPeer(const std::string&) {}
+ virtual void removePeer(const std::string&) {}
+ virtual void peerFinished(const std::string&) {}
+};
+
+
+#if 0
+BOOST_AUTO_TEST_CASE(fileDownloaderTest) {
+ fs::path testPath = "/tmp/filedownloadertest";
+ fs::remove_all(testPath);
+
+ fs::path downloaderPath = testPath / "downloader";
+ fs::path uploaderPath = testPath / "uploader";
+
+ const std::string fileReference = "0123456789012345678901234567890123456789";
+ const std::string fileToSend = "filetosend.txt";
+
+ fs::create_directories(downloaderPath);
+ fs::create_directories(uploaderPath / fileReference);
+
+ fs::path fileToUploadPath = uploaderPath / fileReference / "filetosend.txt";
+
+ {
+ fs::ofstream stream(fileToUploadPath);
+ stream <<"Hello, world!" <<std::endl;
+ }
+
+ CreateTorrent createTorrent(fileToUploadPath);
+ Buffer buffer(createTorrent.bencode());
+
+ ComponentsDeleter deleter;
+ boost::shared_ptr<ExceptionRethrower> exceptionRethrower(new ExceptionRethrower());
+
+ boost::shared_ptr<FileDistributionModel> model(deleter.track(new MockFileDistributionModel()));
+ boost::shared_ptr<FileDownloader> downloader =
+ createDownloader(deleter, downloaderPort, downloaderPath, model, exceptionRethrower);
+
+ boost::shared_ptr<FileDownloader> uploader =
+ createDownloader(deleter, uploaderPort, uploaderPath, model, exceptionRethrower);
+
+ boost::thread uploaderThread(
+ boost::lambda::bind(&FileDownloader::runEventLoop, uploader.get()));
+
+ boost::thread downloaderThread(
+ boost::lambda::bind(&FileDownloader::runEventLoop, downloader.get()));
+
+ uploader->addTorrent(fileReference, buffer);
+ downloader->addTorrent(fileReference, buffer);
+
+ sleep(5);
+ BOOST_CHECK(fs::exists(downloaderPath / fileReference / fileToSend));
+ BOOST_CHECK(!exceptionRethrower->exceptionStored());
+
+ uploaderThread.interrupt();
+ uploaderThread.join();
+
+ downloaderThread.interrupt();
+ downloaderThread.join();
+
+ fs::remove_all(testPath);
+}
+#endif
+
+//TODO: cleanup
+libtorrent::sha1_hash
+toInfoHash(const std::string& fileReference) {
+ assert (fileReference.size() == 40);
+ std::istringstream s(fileReference);
+
+ libtorrent::sha1_hash infoHash;
+ s >> infoHash;
+ return infoHash;
+}
+
+BOOST_AUTO_TEST_CASE(test_filereference_infohash_conversion) {
+ const std::string fileReference = "3a281c905c9b6ebe4d969037a198454fedefbdf3";
+
+ libtorrent::sha1_hash infoHash = toInfoHash(fileReference);
+
+ std::ostringstream fileReferenceString;
+ fileReferenceString <<infoHash;
+
+ BOOST_CHECK(fileReference == fileReferenceString.str());
+
+ std::cout <<fileReference <<std::endl <<fileReferenceString.str() <<std::endl;
+}
diff --git a/filedistribution/src/tests/lib/CMakeLists.txt b/filedistribution/src/tests/lib/CMakeLists.txt
new file mode 100644
index 00000000000..2c710f315e8
--- /dev/null
+++ b/filedistribution/src/tests/lib/CMakeLists.txt
@@ -0,0 +1,6 @@
+# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_library(filedistribution_mocks STATIC
+ SOURCES
+ mock-zookeeper.cpp
+ DEPENDS
+)
diff --git a/filedistribution/src/tests/lib/mock-zookeeper.cpp b/filedistribution/src/tests/lib/mock-zookeeper.cpp
new file mode 100644
index 00000000000..82cd03a268e
--- /dev/null
+++ b/filedistribution/src/tests/lib/mock-zookeeper.cpp
@@ -0,0 +1,326 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <zookeeper/zookeeper.h>
+
+#include <string>
+#include <map>
+#include <cassert>
+#include <cstring>
+#include <vector>
+
+#include <boost/thread.hpp>
+#include <boost/lexical_cast.hpp>
+
+#include <iostream>
+
+#include <vespa/filedistribution/common/concurrentqueue.h>
+
+using std::map;
+using std::string;
+using std::vector;
+using std::pair;
+using std::make_pair;
+using filedistribution::ConcurrentQueue;
+
+namespace {
+std::pair<string, string> parentPathAndChildName(const string& childPath)
+{
+ if (childPath.empty()) {
+ return std::make_pair("", "");
+ } else {
+ assert (childPath[0] == '/');
+
+ size_t index = childPath.find_last_of("/");
+ return std::make_pair(childPath.substr(0, index), childPath.substr(index + 1));
+ }
+}
+
+struct Node {
+ typedef map<string, Node> Children;
+ Children children;
+ bool exists;
+ bool ephemeral;
+ vector<char> buffer;
+ vector<pair<watcher_fn, void*> > watchers;
+
+ Node()
+ :exists(false),
+ ephemeral(false)
+ {}
+
+ void addWatcher(watcher_fn fn, void* context) {
+ if (fn)
+ watchers.push_back(make_pair(fn, context));
+ }
+
+ void triggerWatches(zhandle_t* zh, const std::string& path);
+};
+
+boost::shared_ptr<Node> sharedRoot;
+
+struct ZHandle {
+ struct Worker {
+ ZHandle& zhandle;
+
+ Worker(ZHandle* parent) : zhandle(*parent) {}
+
+ void operator()();
+ };
+
+ int sequence;
+
+ boost::shared_ptr<Node> root;
+ boost::thread _watchersThread;
+ vector<string> ephemeralNodes;
+
+ typedef boost::function<void (void)> InvokeWatcherFun;
+ ConcurrentQueue<InvokeWatcherFun> watcherInvocations;
+
+ Node& getNode(const string& path);
+
+ Node& getParent(const string& path);
+
+ void ephemeralNode(const string&path) {
+ ephemeralNodes.push_back(path);
+ }
+
+ ZHandle() : sequence(0), _watchersThread(Worker(this)) {
+ if (!sharedRoot)
+ sharedRoot.reset(new Node());
+
+ root = sharedRoot;
+ }
+
+ ~ZHandle() {
+ std::for_each(ephemeralNodes.begin(), ephemeralNodes.end(),
+ boost::bind(&zoo_delete, (zhandle_t*)this,
+ boost::bind(&string::c_str, _1),
+ 0));
+
+ _watchersThread.interrupt();
+ _watchersThread.join();
+ }
+};
+
+void
+ZHandle::Worker::operator()()
+{
+ while (!boost::this_thread::interruption_requested()) {
+ InvokeWatcherFun fun = zhandle.watcherInvocations.pop();
+ boost::this_thread::disable_interruption di;
+ fun();
+ }
+}
+
+Node& ZHandle::getNode(const string& path) {
+ auto splittedPath = parentPathAndChildName(path);
+ if (splittedPath.second.empty()) {
+ return *root;
+ } else {
+ return getNode(splittedPath.first).children[splittedPath.second];
+ }
+}
+
+Node&
+ZHandle::getParent(const string& childPath)
+{
+ auto splittedPath = parentPathAndChildName(childPath);
+ if (splittedPath.second.empty()) {
+ throw "Can't get parent of root.";
+ } else {
+ return getNode(splittedPath.first);
+ }
+}
+
+void
+Node::triggerWatches(zhandle_t* zh, const std::string& path) {
+ for (auto i = watchers.begin(); i != watchers.end(); ++i) {
+ ((ZHandle*)zh)->watcherInvocations.push(boost::bind(i->first, zh, \
+ /*TODO: type, state*/ 0, 0,
+ boost::bind(&string::c_str, path),
+ i->second));
+ }
+ watchers.clear();
+}
+
+} //anonymous namespace
+
+extern "C" {
+
+ZOOAPI void zoo_set_debug_level(ZooLogLevel) {}
+ZOOAPI zhandle_t *zookeeper_init(const char * host, watcher_fn fn,
+ int recv_timeout, const clientid_t *clientid, void *context, int flags)
+{
+ (void)host;
+ (void)fn;
+ (void)recv_timeout;
+ (void)clientid;
+ (void)context;
+ (void)flags;
+
+ return (zhandle_t*)new ZHandle;
+}
+
+ZOOAPI int zookeeper_close(zhandle_t *zh)
+{
+ delete (ZHandle*)zh;
+ return 0;
+}
+
+ZOOAPI int zoo_create(zhandle_t *zh, const char *pathOrPrefix, const char *value,
+ int valuelen, const struct ACL_vector *, int flags,
+ char *path_buffer, int path_buffer_len)
+{
+ std::string path = pathOrPrefix;
+ if (flags & ZOO_SEQUENCE)
+ path += boost::lexical_cast<std::string>(((ZHandle*)zh)->sequence++);
+
+ strncpy(path_buffer, path.c_str(), path_buffer_len);
+ Node& node = ((ZHandle*)zh)->getNode(path);
+ node.exists = true;
+
+ if (flags & ZOO_EPHEMERAL)
+ ((ZHandle*)zh)->ephemeralNode(path);
+
+ node.buffer.resize(valuelen);
+ std::copy(value, value + valuelen, node.buffer.begin());
+
+
+ node.triggerWatches(zh, path);
+ ((ZHandle*)zh)->getParent(path).triggerWatches(zh,
+ parentPathAndChildName(path).first);
+
+ return 0;
+}
+
+
+ZOOAPI int zoo_set(zhandle_t *zh, const char *path, const char *buffer,
+ int buflen, int version) {
+ (void)version;
+
+ Node& node = ((ZHandle*)zh)->getNode(path);
+ if (!node.exists)
+ return ZNONODE;
+
+
+ node.buffer.resize(buflen);
+ std::copy(buffer, buffer + buflen, node.buffer.begin());
+
+ node.triggerWatches(zh, path);
+ return 0;
+}
+
+
+
+ZOOAPI int zoo_get_children(zhandle_t *zh, const char *path, int watch,
+ struct String_vector *strings)
+{
+ (void)watch;
+ return zoo_wget_children(zh, path,
+ 0, 0,
+ strings);
+}
+
+ZOOAPI int zoo_wget_children(zhandle_t *zh, const char *path,
+ watcher_fn watcher, void* watcherCtx,
+ struct String_vector *strings)
+{
+ Node& node = ((ZHandle*)zh)->getNode(path);
+ strings->count = node.children.size();
+ strings->data = new char*[strings->count];
+
+ int index = 0;
+ for (auto i = node.children.begin(); i != node.children.end(); ++i) {
+ strings->data[index] = new char[i->first.length() + 1];
+ std::strcpy(strings->data[index], &*i->first.begin());
+ ++index;
+ }
+
+ node.addWatcher(watcher, watcherCtx);
+
+ return 0;
+}
+
+
+
+
+ZOOAPI int zoo_delete(zhandle_t *zh, const char *path, int version)
+{
+ (void)version;
+
+ std::string pathStr = path;
+ int index = pathStr.find_last_of("/");
+
+ if (pathStr.length() == 1)
+ throw "Can't delete root";
+
+ Node& parent = ((ZHandle*)zh)->getNode(pathStr.substr(0, index));
+ parent.children.erase(pathStr.substr(index + 1));
+
+ ((ZHandle*)zh)->getParent(path).triggerWatches(zh,
+ parentPathAndChildName(path).first);
+
+ return 0;
+}
+
+void zoo_set_log_stream(FILE*) {}
+
+int deallocate_String_vector(struct String_vector *v) {
+ for (int i=0; i< v->count; ++i) {
+ delete[] v->data[i];
+ }
+ delete[] v->data;
+ return 0;
+}
+
+
+ZOOAPI int zoo_get(zhandle_t *zh, const char *path, int watch, char *buffer,
+ int* buffer_len, struct Stat *stat)
+{
+ (void)watch;
+
+ return zoo_wget(zh, path,
+ 0, 0,
+ buffer, buffer_len, stat);
+
+}
+
+ZOOAPI int zoo_wget(zhandle_t *zh, const char *path,
+ watcher_fn watcher, void* watcherCtx,
+ char *buffer, int* buffer_len, struct Stat *)
+{
+ Node& node = ((ZHandle*)zh)->getNode(path);
+ std::copy(node.buffer.begin(), node.buffer.end(), buffer);
+ *buffer_len = node.buffer.size();
+
+ node.addWatcher(watcher, watcherCtx);
+ return 0;
+}
+
+ZOOAPI int zoo_wexists(zhandle_t *zh, const char *path,
+ watcher_fn watcher, void* watcherCtx, struct Stat *)
+{
+ Node& node = ((ZHandle*)zh)->getNode(path);
+
+ node.addWatcher(watcher, watcherCtx);
+ return node.exists ? ZOK : ZNONODE;
+}
+
+ZOOAPI int zoo_exists(zhandle_t *zh, const char *path, int watch, struct Stat *stat)
+{
+ (void)watch;
+ return zoo_wexists(zh, path,
+ 0, 0,
+ stat);
+}
+
+
+
+
+ZOOAPI ACL_vector ZOO_OPEN_ACL_UNSAFE;
+
+ZOOAPI const int ZOO_SEQUENCE = 1;
+ZOOAPI const int ZOO_EPHEMERAL = 2;
+ZOOAPI const int ZOO_SESSION_EVENT = 3;
+ZOOAPI const int ZOO_EXPIRED_SESSION_STATE = 4;
+ZOOAPI const int ZOO_AUTH_FAILED_STATE = 5;
+}
diff --git a/filedistribution/src/tests/rpc/.gitignore b/filedistribution/src/tests/rpc/.gitignore
new file mode 100644
index 00000000000..b29a47efd87
--- /dev/null
+++ b/filedistribution/src/tests/rpc/.gitignore
@@ -0,0 +1 @@
+filedistribution_rpc_test_app
diff --git a/filedistribution/src/tests/rpc/CMakeLists.txt b/filedistribution/src/tests/rpc/CMakeLists.txt
new file mode 100644
index 00000000000..9a592c15ab4
--- /dev/null
+++ b/filedistribution/src/tests/rpc/CMakeLists.txt
@@ -0,0 +1,14 @@
+# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_executable(filedistribution_rpc_test_app
+ SOURCES
+ testfileprovider.cpp
+ DEPENDS
+ filedistribution_filedistributorrpc
+)
+target_compile_options(filedistribution_rpc_test_app PRIVATE -DTORRENT_DEBUG -DTORRENT_DISABLE_ENCRYPTION -DTORRENT_DISABLE_DHT -DWITH_SHIPPED_GEOIP_H -DBOOST_ASIO_HASH_MAP_BUCKETS=1021 -DBOOST_EXCEPTION_DISABLE -DBOOST_ASIO_ENABLE_CANCELIO -DBOOST_ASIO_DYN_LINK -DTORRENT_LINKING_SHARED)
+vespa_add_target_system_dependency(filedistribution_rpc_test_app boost boost_system-mt-d)
+vespa_add_target_system_dependency(filedistribution_rpc_test_app boost boost_thread-mt-d)
+vespa_add_target_system_dependency(filedistribution_rpc_test_app boost boost_program_options-mt-d)
+vespa_add_target_system_dependency(filedistribution_rpc_test_app boost boost_filesystem-mt-d)
+vespa_add_target_system_dependency(filedistribution_rpc_test_app boost boost_unit_test_framework-mt-d)
+vespa_add_test(NAME filedistribution_rpc_test_app NO_VALGRIND COMMAND filedistribution_rpc_test_app)
diff --git a/filedistribution/src/tests/rpc/mockfileprovider.h b/filedistribution/src/tests/rpc/mockfileprovider.h
new file mode 100644
index 00000000000..745acc7196c
--- /dev/null
+++ b/filedistribution/src/tests/rpc/mockfileprovider.h
@@ -0,0 +1,50 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/filedistribution/rpc/fileprovider.h>
+#include <boost/thread/barrier.hpp>
+
+namespace filedistribution {
+
+class MockFileProvider : public FileProvider {
+ DownloadCompletedSignal _downloadCompleted;
+ DownloadFailedSignal _downloadFailed;
+public:
+ static const std::string _queueForeverFileReference;
+
+ boost::barrier _queueForeverBarrier;
+
+ boost::optional<boost::filesystem::path> getPath(const std::string& fileReference) {
+ if (fileReference == "dd") {
+ return boost::filesystem::path("direct/result/path");
+ } else {
+ return boost::optional<boost::filesystem::path>();
+ }
+ }
+
+ void downloadFile(const std::string& fileReference) {
+ if (fileReference == _queueForeverFileReference) {
+ _queueForeverBarrier.wait();
+ return;
+ }
+
+ sleep(1);
+ downloadCompleted()(fileReference, "downloaded/path/" + fileReference);
+ }
+
+ //Overrides
+ DownloadCompletedSignal& downloadCompleted() {
+ return _downloadCompleted;
+ }
+
+ DownloadFailedSignal& downloadFailed() {
+ return _downloadFailed;
+ }
+
+ MockFileProvider()
+ :_queueForeverBarrier(2)
+ {}
+};
+
+} //namespace filedistribution
+
diff --git a/filedistribution/src/tests/rpc/testfileprovider.cpp b/filedistribution/src/tests/rpc/testfileprovider.cpp
new file mode 100644
index 00000000000..6881eb96b8a
--- /dev/null
+++ b/filedistribution/src/tests/rpc/testfileprovider.cpp
@@ -0,0 +1,66 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#define BOOST_TEST_DYN_LINK
+#define BOOST_TEST_MAIN
+
+#include <vespa/fastos/fastos.h>
+#include "mockfileprovider.h"
+#include <vespa/filedistribution/rpc/filedistributorrpc.h>
+#include <vespa/frtstream/frtclientstream.h>
+#include <iostream>
+#include <boost/test/unit_test.hpp>
+
+namespace fd = filedistribution;
+
+using fd::MockFileProvider;
+
+const std::string MockFileProvider::_queueForeverFileReference("queue-forever");
+
+BOOST_AUTO_TEST_CASE(fileDistributionRPCTest) {
+ const std::string spec("tcp/localhost:9111");
+ boost::shared_ptr<fd::MockFileProvider> provider(new fd::MockFileProvider());
+ boost::shared_ptr<fd::FileDistributorRPC> fileDistributorRPC(new fd::FileDistributorRPC(spec, provider));
+ fileDistributorRPC->start();
+
+ frtstream::FrtClientStream rpc(spec);
+ frtstream::Method method("waitFor");
+
+ std::string path;
+ rpc <<method <<"dd";
+ rpc >> path;
+ BOOST_CHECK_EQUAL("direct/result/path", path);
+
+ rpc <<method <<"0123456789abcdef";
+ rpc >> path;
+ BOOST_CHECK_EQUAL("downloaded/path/0123456789abcdef", path);
+}
+
+//must be run through valgrind
+BOOST_AUTO_TEST_CASE(require_that_queued_requests_does_not_leak_memory) {
+ const std::string spec("tcp/localhost:9111");
+ boost::shared_ptr<MockFileProvider> provider(new MockFileProvider());
+ boost::shared_ptr<fd::FileDistributorRPC> fileDistributorRPC(new fd::FileDistributorRPC(spec, provider));
+ fileDistributorRPC->start();
+
+ FRT_Supervisor supervisor;
+
+ supervisor.Start();
+ FRT_Target *target = supervisor.GetTarget(spec.c_str());
+
+ FRT_RPCRequest* request = supervisor.AllocRPCRequest();
+ request->SetMethodName("waitFor");
+ request->GetParams()->AddString(MockFileProvider::_queueForeverFileReference.c_str());
+ target->InvokeVoid(request);
+
+ provider->_queueForeverBarrier.wait(); //the request has been enqueued.
+ fileDistributorRPC.reset();
+
+ target->SubRef();
+ supervisor.ShutDown(true);
+
+}
+
+BOOST_AUTO_TEST_CASE(require_that_port_can_be_extracted_from_connection_spec) {
+ BOOST_CHECK_EQUAL(9056, fd::FileDistributorRPC::get_port("tcp/host:9056"));
+ BOOST_CHECK_EQUAL(9056, fd::FileDistributorRPC::get_port("tcp/9056"));
+ BOOST_CHECK_EQUAL(9056, fd::FileDistributorRPC::get_port("9056"));
+}
diff --git a/filedistribution/src/tests/scheduler/.gitignore b/filedistribution/src/tests/scheduler/.gitignore
new file mode 100644
index 00000000000..b1976d1c516
--- /dev/null
+++ b/filedistribution/src/tests/scheduler/.gitignore
@@ -0,0 +1 @@
+filedistribution_scheduler_test_app
diff --git a/filedistribution/src/tests/scheduler/CMakeLists.txt b/filedistribution/src/tests/scheduler/CMakeLists.txt
new file mode 100644
index 00000000000..67b86c3e634
--- /dev/null
+++ b/filedistribution/src/tests/scheduler/CMakeLists.txt
@@ -0,0 +1,16 @@
+# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_executable(filedistribution_scheduler_test_app
+ SOURCES
+ test-scheduler.cpp
+ DEPENDS
+ filedistribution_distributor
+ filedistribution_filedistributionmodel
+ filedistribution_common
+)
+target_compile_options(filedistribution_scheduler_test_app PRIVATE -DTORRENT_DEBUG -DTORRENT_DISABLE_ENCRYPTION -DTORRENT_DISABLE_DHT -DWITH_SHIPPED_GEOIP_H -DBOOST_ASIO_HASH_MAP_BUCKETS=1021 -DBOOST_EXCEPTION_DISABLE -DBOOST_ASIO_ENABLE_CANCELIO -DBOOST_ASIO_DYN_LINK -DTORRENT_LINKING_SHARED)
+vespa_add_target_system_dependency(filedistribution_scheduler_test_app boost boost_system-mt-d)
+vespa_add_target_system_dependency(filedistribution_scheduler_test_app boost boost_thread-mt-d)
+vespa_add_target_system_dependency(filedistribution_scheduler_test_app boost boost_program_options-mt-d)
+vespa_add_target_system_dependency(filedistribution_scheduler_test_app boost boost_filesystem-mt-d)
+vespa_add_target_system_dependency(filedistribution_scheduler_test_app boost boost_unit_test_framework-mt-d)
+vespa_add_test(NAME filedistribution_scheduler_test_app NO_VALGRIND COMMAND filedistribution_scheduler_test_app)
diff --git a/filedistribution/src/tests/scheduler/test-scheduler.cpp b/filedistribution/src/tests/scheduler/test-scheduler.cpp
new file mode 100644
index 00000000000..cc669690a31
--- /dev/null
+++ b/filedistribution/src/tests/scheduler/test-scheduler.cpp
@@ -0,0 +1,110 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#define BOOST_TEST_DYN_LINK
+#define BOOST_TEST_MAIN
+#include <vespa/fastos/fastos.h>
+#include <boost/test/unit_test.hpp>
+
+#include <vespa/filedistribution/distributor/scheduler.h>
+
+#include <iostream>
+
+#include <boost/thread/barrier.hpp>
+
+using filedistribution::Scheduler;
+
+namespace asio = boost::asio;
+
+class TestException {};
+
+
+struct CallRun {
+ volatile bool _caughtException;
+
+ CallRun()
+ :_caughtException(false)
+ {}
+
+ void operator()(asio::io_service& ioService) {
+ while (!boost::this_thread::interruption_requested()) {
+ try {
+ //No reset needed after handling exceptions.
+ ioService.run();
+ } catch(const TestException& e ) {
+ _caughtException = true;
+ }
+ }
+ }
+};
+
+struct Fixture {
+ CallRun callRun;
+ Scheduler scheduler;
+
+ Fixture()
+ : scheduler(boost::ref(callRun))
+ {}
+};
+
+
+BOOST_FIXTURE_TEST_SUITE(SchedulerTest, Fixture)
+
+
+struct RepeatedTask : Scheduler::Task {
+ void doHandle() {
+ std::cout <<"RepeatedTask::doHandle " <<std::endl;
+ schedule(boost::posix_time::seconds(1));
+ }
+
+ RepeatedTask(Scheduler& scheduler) : Task(scheduler) {}
+};
+
+BOOST_AUTO_TEST_CASE(require_tasks_does_not_keep_scheduler_alive) {
+ RepeatedTask::SP task(new RepeatedTask(scheduler));
+ task->schedule(boost::posix_time::hours(10));
+}
+
+struct EnsureInvokedTask : Scheduler::Task {
+ boost::barrier& _barrier;
+
+ void doHandle() {
+ _barrier.wait();
+ }
+
+ EnsureInvokedTask(Scheduler& scheduler, boost::barrier& barrier) :
+ Task(scheduler),
+ _barrier(barrier)
+ {}
+};
+
+
+BOOST_AUTO_TEST_CASE(require_task_invoked) {
+ boost::barrier barrier(2);
+
+ EnsureInvokedTask::SP task(new EnsureInvokedTask(scheduler, barrier));
+ task->schedule(boost::posix_time::milliseconds(50));
+
+ barrier.wait();
+}
+
+struct ThrowExceptionTask : Scheduler::Task {
+ void doHandle() {
+ throw TestException();
+ }
+
+ ThrowExceptionTask(Scheduler& scheduler) :
+ Task(scheduler)
+ {}
+};
+
+BOOST_AUTO_TEST_CASE(require_exception_from_tasks_can_be_caught) {
+ ThrowExceptionTask::SP task(new ThrowExceptionTask(scheduler));
+ task->scheduleNow();
+
+ for (int i=0; i<200 && !callRun._caughtException; ++i) {
+ boost::thread::sleep(boost::get_system_time() + boost::posix_time::milliseconds(100));
+ }
+
+ BOOST_CHECK(callRun._caughtException);
+}
+
+BOOST_AUTO_TEST_SUITE_END()
diff --git a/filedistribution/src/tests/status/.gitignore b/filedistribution/src/tests/status/.gitignore
new file mode 100644
index 00000000000..3da528fcd45
--- /dev/null
+++ b/filedistribution/src/tests/status/.gitignore
@@ -0,0 +1 @@
+filedistribution_status_test_app
diff --git a/filedistribution/src/tests/status/CMakeLists.txt b/filedistribution/src/tests/status/CMakeLists.txt
new file mode 100644
index 00000000000..79676c4e811
--- /dev/null
+++ b/filedistribution/src/tests/status/CMakeLists.txt
@@ -0,0 +1,15 @@
+# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_executable(filedistribution_status_test_app
+ SOURCES
+ test-status.cpp
+ DEPENDS
+ filedistribution_filedistributionmodel
+ filedistribution_common
+)
+target_compile_options(filedistribution_status_test_app PRIVATE -DTORRENT_DEBUG -DTORRENT_DISABLE_ENCRYPTION -DTORRENT_DISABLE_DHT -DWITH_SHIPPED_GEOIP_H -DBOOST_ASIO_HASH_MAP_BUCKETS=1021 -DBOOST_EXCEPTION_DISABLE -DBOOST_ASIO_ENABLE_CANCELIO -DBOOST_ASIO_DYN_LINK -DTORRENT_LINKING_SHARED)
+vespa_add_target_system_dependency(filedistribution_status_test_app boost boost_system-mt-d)
+vespa_add_target_system_dependency(filedistribution_status_test_app boost boost_thread-mt-d)
+vespa_add_target_system_dependency(filedistribution_status_test_app boost boost_program_options-mt-d)
+vespa_add_target_system_dependency(filedistribution_status_test_app boost boost_filesystem-mt-d)
+vespa_add_target_system_dependency(filedistribution_status_test_app boost boost_unit_test_framework-mt-d)
+vespa_add_test(NAME filedistribution_status_test_app NO_VALGRIND COMMAND filedistribution_status_test_app)
diff --git a/filedistribution/src/tests/status/test-status.cpp b/filedistribution/src/tests/status/test-status.cpp
new file mode 100644
index 00000000000..7021752f316
--- /dev/null
+++ b/filedistribution/src/tests/status/test-status.cpp
@@ -0,0 +1,19 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#define BOOST_TEST_DYN_LINK
+#define BOOST_TEST_MAIN
+#include <vespa/fastos/fastos.h>
+#include <boost/test/unit_test.hpp>
+#include <boost/foreach.hpp>
+
+#include <vespa/filedistribution/common/exceptionrethrower.h>
+#include <vespa/filedistribution/model/zkfacade.h>
+#include <vespa/filedistribution/model/filedistributionmodel.h>
+#include <vespa/filedistribution/model/filedistributionmodelimpl.h>
+
+using namespace filedistribution;
+
+
+BOOST_AUTO_TEST_CASE(test_retrieve_status) {
+ // TODO:
+}
+
diff --git a/filedistribution/src/tests/zkfacade/.gitignore b/filedistribution/src/tests/zkfacade/.gitignore
new file mode 100644
index 00000000000..6ffef2339b1
--- /dev/null
+++ b/filedistribution/src/tests/zkfacade/.gitignore
@@ -0,0 +1 @@
+filedistribution_zkfacade_test_app
diff --git a/filedistribution/src/tests/zkfacade/CMakeLists.txt b/filedistribution/src/tests/zkfacade/CMakeLists.txt
new file mode 100644
index 00000000000..2c8e05aca89
--- /dev/null
+++ b/filedistribution/src/tests/zkfacade/CMakeLists.txt
@@ -0,0 +1,15 @@
+# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_executable(filedistribution_zkfacade_test_app
+ SOURCES
+ test-zkfacade.cpp
+ DEPENDS
+ filedistribution_filedistributionmodel
+ filedistribution_common
+ filedistribution_mocks
+)
+target_compile_options(filedistribution_zkfacade_test_app PRIVATE -DTORRENT_DEBUG -DTORRENT_DISABLE_ENCRYPTION -DTORRENT_DISABLE_DHT -DWITH_SHIPPED_GEOIP_H -DBOOST_ASIO_HASH_MAP_BUCKETS=1021 -DBOOST_EXCEPTION_DISABLE -DBOOST_ASIO_ENABLE_CANCELIO -DBOOST_ASIO_DYN_LINK -DTORRENT_LINKING_SHARED)
+vespa_add_target_system_dependency(filedistribution_zkfacade_test_app boost boost_system-mt-d)
+vespa_add_target_system_dependency(filedistribution_zkfacade_test_app boost boost_thread-mt-d)
+vespa_add_target_system_dependency(filedistribution_zkfacade_test_app boost boost_program_options-mt-d)
+vespa_add_target_system_dependency(filedistribution_zkfacade_test_app boost boost_filesystem-mt-d)
+vespa_add_target_system_dependency(filedistribution_zkfacade_test_app boost boost_unit_test_framework-mt-d)
diff --git a/filedistribution/src/tests/zkfacade/test-zkfacade.cpp b/filedistribution/src/tests/zkfacade/test-zkfacade.cpp
new file mode 100644
index 00000000000..d45e5059a53
--- /dev/null
+++ b/filedistribution/src/tests/zkfacade/test-zkfacade.cpp
@@ -0,0 +1,226 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#define BOOST_TEST_DYN_LINK
+#define BOOST_TEST_MAIN
+#define BOOST_TEST_MODULE zkfacade test
+#include <vespa/fastos/fastos.h>
+#include <boost/test/unit_test.hpp>
+
+#include <iostream>
+
+#include <boost/thread/barrier.hpp>
+#include <boost/thread/thread.hpp>
+#include <boost/checked_delete.hpp>
+
+#include <vespa/filedistribution/common/componentsdeleter.h>
+#include <vespa/filedistribution/model/zkfacade.h>
+
+#include <zookeeper/zookeeper.h>
+
+
+
+using namespace filedistribution;
+
+namespace {
+
+
+struct Watcher : public ZKFacade::NodeChangedWatcher {
+ boost::barrier _barrier;
+
+ Watcher() :
+ _barrier(2) {}
+
+ void operator()() {
+ _barrier.wait();
+ }
+};
+
+struct Fixture {
+ boost::shared_ptr<ExceptionRethrower> _exceptionRethrower;
+ ComponentsDeleter _componentsDeleter;
+ boost::shared_ptr<ZKFacade> zk;
+ ZKFacade::Path testNode;
+
+ Fixture() {
+ _exceptionRethrower.reset(new ExceptionRethrower());
+
+ zoo_set_debug_level(ZOO_LOG_LEVEL_WARN);
+ zk = _componentsDeleter.track(new ZKFacade("test1-tonyv:2181", _exceptionRethrower));
+
+ testNode = "/test-node";
+ zk->removeIfExists(testNode);
+ }
+
+ ~Fixture() {
+ if (zk) {
+ zk->removeIfExists(testNode);
+ }
+ }
+};
+
+} //anonymous namespace
+
+
+BOOST_FIXTURE_TEST_SUITE(ZKFacadeTests, Fixture)
+
+BOOST_AUTO_TEST_CASE(hasNode)
+{
+ zk->setData(testNode, "", 0);
+ BOOST_CHECK(zk->hasNode(testNode));
+
+ zk->remove(testNode);
+ BOOST_CHECK(!zk->hasNode(testNode));
+}
+
+
+BOOST_AUTO_TEST_CASE(hasNodeNotification)
+{
+ boost::shared_ptr<Watcher> watcher(new Watcher);
+
+ zk->hasNode(testNode, watcher);
+ zk->setData(testNode, "", 0);
+ watcher->_barrier.wait();
+
+ //after the notification has returned, the watcher must no longer reside in watchers map.
+ for (int i=0; i<20 && !watcher.unique(); ++i) {
+ boost::thread::sleep(boost::get_system_time() + boost::posix_time::milliseconds(100));
+ }
+ BOOST_CHECK(watcher.unique());
+}
+
+BOOST_AUTO_TEST_CASE(getAndSetData)
+{
+ std::string inputString = "test data.";
+ Buffer inputBuffer(inputString.begin(), inputString.end());
+
+ zk->setData(testNode, inputBuffer);
+
+ Buffer outputBuffer(zk->getData(testNode));
+ std::string outputString(outputBuffer.begin(), outputBuffer.end());
+
+ BOOST_CHECK(outputString == inputString);
+
+ outputString = zk->getString(testNode);
+ BOOST_CHECK(outputString == inputString);
+}
+
+BOOST_AUTO_TEST_CASE(setDataMustExist)
+{
+ bool mustExist = true;
+ BOOST_REQUIRE_THROW(zk->setData(testNode, "", 0, mustExist), ZKNodeDoesNotExistsException);
+}
+
+BOOST_AUTO_TEST_CASE(createSequenceNode)
+{
+ zk->setData(testNode, "", 0);
+
+ ZKFacade::Path prefix = testNode / "prefix";
+ zk->createSequenceNode(prefix, "test", 4);
+ zk->createSequenceNode(prefix, "test", 4);
+ zk->createSequenceNode(prefix, "test", 4);
+
+ std::vector<std::string> children = zk->getChildren(testNode);
+ BOOST_CHECK(children.size() == 3);
+ BOOST_CHECK(children.begin()->substr(0,6) == "prefix");
+
+ Buffer buffer(zk->getData(testNode / *children.begin()));
+ std::string bufferContent(buffer.begin(), buffer.end());
+
+ BOOST_CHECK(bufferContent == "test");
+}
+
+BOOST_AUTO_TEST_CASE(retainOnly)
+{
+ zk->setData(testNode, "", 0);
+
+ zk->setData(testNode / "a", "", 0);
+ zk->setData(testNode / "b", "", 0);
+ zk->setData(testNode / "c", "", 0);
+ zk->setData(testNode / "d", "", 0);
+
+ std::vector<std::string> toRetain;
+ toRetain.push_back("a");
+ toRetain.push_back("c");
+
+ zk->retainOnly(testNode, toRetain);
+ std::vector<std::string> children = zk->getChildren(testNode);
+
+ std::sort(children.begin(), children.end());
+ BOOST_CHECK(children == toRetain);
+}
+
+
+
+BOOST_AUTO_TEST_CASE(addEphemeralNode)
+{
+ ZKFacade::Path ephemeralNode = "/test-ephemeral-node";
+ zk->removeIfExists(ephemeralNode);
+
+ //Checked deleter is ok here since we're not installing any watchers
+ ZKFacade::SP zk2(new ZKFacade("test1-tonyv:2181", _exceptionRethrower),
+ boost::checked_deleter<ZKFacade>());
+ zk2->addEphemeralNode(ephemeralNode);
+
+ BOOST_CHECK(zk->hasNode(ephemeralNode));
+ zk2.reset();
+ BOOST_CHECK(!zk->hasNode(ephemeralNode));
+}
+
+
+
+BOOST_AUTO_TEST_CASE(dataChangedNotification)
+{
+ boost::shared_ptr<Watcher> watcher(new Watcher);
+
+ zk->setData(testNode, "", 0);
+ Buffer buffer(zk->getData(testNode, watcher));
+ BOOST_CHECK(buffer.size() == 0);
+
+ bool mustExist = true;
+ zk->setData(testNode, "test", 4, mustExist);
+ watcher->_barrier.wait();
+}
+
+BOOST_AUTO_TEST_CASE(getChildrenNotification)
+{
+ boost::shared_ptr<Watcher> watcher(new Watcher);
+
+ zk->setData(testNode, "", 0);
+ zk->getChildren(testNode, watcher);
+
+ zk->setData(testNode / "child", "", 0);
+ watcher->_barrier.wait();
+}
+
+BOOST_AUTO_TEST_CASE(require_that_zkfacade_can_be_deleted_from_callback)
+{
+ struct DeleteZKFacadeWatcher : public Watcher {
+ boost::shared_ptr<ZKFacade> _zk;
+
+ DeleteZKFacadeWatcher(const boost::shared_ptr<ZKFacade>& zk)
+ :_zk(zk)
+ {}
+
+ void operator()() {
+ BOOST_CHECK(_zk.use_count() == 2);
+ _zk.reset();
+ Watcher::operator()();
+ }
+ };
+
+ boost::shared_ptr<Watcher> watcher((Watcher*)new DeleteZKFacadeWatcher(zk));
+
+ zk->setData(testNode, "", 0);
+ zk->getData(testNode, watcher);
+
+ ZKFacade* unprotectedZk = zk.get();
+ zk.reset();
+
+ unprotectedZk->setData(testNode, "t", 1);
+ watcher->_barrier.wait();
+
+ //Must wait longer than the zookeeper_close timeout to catch
+ //problems due to closing zookeeper in a zookeeper watcher thread.
+ sleep(3);
+}
+
+BOOST_AUTO_TEST_SUITE_END()
diff --git a/filedistribution/src/tests/zkfiledbmodel/.gitignore b/filedistribution/src/tests/zkfiledbmodel/.gitignore
new file mode 100644
index 00000000000..18c14c47c56
--- /dev/null
+++ b/filedistribution/src/tests/zkfiledbmodel/.gitignore
@@ -0,0 +1 @@
+filedistribution_zkfiledbmodel_test_app
diff --git a/filedistribution/src/tests/zkfiledbmodel/CMakeLists.txt b/filedistribution/src/tests/zkfiledbmodel/CMakeLists.txt
new file mode 100644
index 00000000000..7b205d009f8
--- /dev/null
+++ b/filedistribution/src/tests/zkfiledbmodel/CMakeLists.txt
@@ -0,0 +1,16 @@
+# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_executable(filedistribution_zkfiledbmodel_test_app
+ SOURCES
+ test-zkfiledbmodel.cpp
+ DEPENDS
+ filedistribution_filedistributionmodel
+ filedistribution_common
+ filedistribution_mocks
+)
+target_compile_options(filedistribution_zkfiledbmodel_test_app PRIVATE -DTORRENT_DEBUG -DTORRENT_DISABLE_ENCRYPTION -DTORRENT_DISABLE_DHT -DWITH_SHIPPED_GEOIP_H -DBOOST_ASIO_HASH_MAP_BUCKETS=1021 -DBOOST_EXCEPTION_DISABLE -DBOOST_ASIO_ENABLE_CANCELIO -DBOOST_ASIO_DYN_LINK -DTORRENT_LINKING_SHARED)
+vespa_add_target_system_dependency(filedistribution_zkfiledbmodel_test_app boost boost_system-mt-d)
+vespa_add_target_system_dependency(filedistribution_zkfiledbmodel_test_app boost boost_thread-mt-d)
+vespa_add_target_system_dependency(filedistribution_zkfiledbmodel_test_app boost boost_program_options-mt-d)
+vespa_add_target_system_dependency(filedistribution_zkfiledbmodel_test_app boost boost_filesystem-mt-d)
+vespa_add_target_system_dependency(filedistribution_zkfiledbmodel_test_app boost boost_unit_test_framework-mt-d)
+vespa_add_test(NAME filedistribution_zkfiledbmodel_test_app NO_VALGRIND COMMAND filedistribution_zkfiledbmodel_test_app)
diff --git a/filedistribution/src/tests/zkfiledbmodel/test-zkfiledbmodel.cpp b/filedistribution/src/tests/zkfiledbmodel/test-zkfiledbmodel.cpp
new file mode 100644
index 00000000000..b385949bb98
--- /dev/null
+++ b/filedistribution/src/tests/zkfiledbmodel/test-zkfiledbmodel.cpp
@@ -0,0 +1,99 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#define BOOST_TEST_DYN_LINK
+#define BOOST_TEST_MAIN
+#define BOOST_TEST_MODULE zkfiledbmodel test
+#include <vespa/fastos/fastos.h>
+#include <boost/test/unit_test.hpp>
+
+#include <iostream>
+
+#include <boost/thread/barrier.hpp>
+#include <boost/thread/thread.hpp>
+#include <boost/checked_delete.hpp>
+
+#include <vespa/filedistribution/common/componentsdeleter.h>
+#include <vespa/filedistribution/model/zkfacade.h>
+#include <vespa/filedistribution/model/zkfiledbmodel.h>
+
+#include <zookeeper/zookeeper.h>
+
+
+using namespace filedistribution;
+
+typedef boost::filesystem::path Path;
+
+namespace {
+
+
+struct Fixture {
+ boost::shared_ptr<ExceptionRethrower> _exceptionRethrower;
+ ComponentsDeleter _componentsDeleter;
+ boost::shared_ptr<ZKFacade> zk;
+ boost::shared_ptr<ZKFileDBModel> model;
+
+ Fixture() {
+ _exceptionRethrower.reset(new ExceptionRethrower());
+
+ zoo_set_debug_level(ZOO_LOG_LEVEL_WARN);
+ zk = _componentsDeleter.track(new ZKFacade("test1-tonyv:2181", _exceptionRethrower));
+ zk->setData("/vespa", "", 0);
+
+ model = _componentsDeleter.track(new ZKFileDBModel(zk));
+ }
+};
+
+} //anonymous namespace
+
+
+BOOST_FIXTURE_TEST_SUITE(ZKFileDBModelTests, Fixture)
+
+BOOST_AUTO_TEST_CASE(retainOnlyHostsForTenant)
+{
+ Path path = "/vespa/filedistribution/hosts";
+ std::vector<std::string> files = {"myfile"};
+ BOOST_CHECK(zk->hasNode("/vespa"));
+ BOOST_CHECK(zk->hasNode("/vespa/filedistribution"));
+ BOOST_CHECK(zk->hasNode(path));
+ model->setDeployedFilesToDownload("testhost", "myapp:so:cool", files);
+ model->setDeployedFilesToDownload("testhost2", "myapp:so:cool", files);
+ model->setDeployedFilesToDownload("testhost3", "myapp:so:cool", files);
+ model->setDeployedFilesToDownload("testhost3", "myapp:legacyid:so:cool", files);
+ model->setDeployedFilesToDownload("testhost3", "yourapp:so:cool", files);
+ BOOST_CHECK(zk->getChildren(path / "testhost").size() == 1);
+ BOOST_CHECK(zk->getChildren(path / "testhost2").size() == 1);
+ BOOST_CHECK(zk->getChildren(path / "testhost3").size() == 3);
+
+ model->cleanDeployedFilesToDownload({"testhost3"}, "yourapp:so:cool");
+ model->removeDeploymentsThatHaveDifferentApplicationId({"testhost3"}, "yourapp:so:cool");
+ BOOST_CHECK(zk->hasNode(path / "testhost"));
+ BOOST_CHECK(zk->hasNode(path / "testhost2"));
+ BOOST_CHECK(zk->hasNode(path / "testhost3"));
+ BOOST_CHECK(zk->getChildren(path / "testhost").size() == 1);
+ BOOST_CHECK(zk->getChildren(path / "testhost2").size() == 1);
+ BOOST_CHECK(zk->getChildren(path / "testhost3").size() == 1);
+
+ model->cleanDeployedFilesToDownload({"testhost"}, "myapp:not:cool");
+ model->removeDeploymentsThatHaveDifferentApplicationId({"testhost"}, "myapp:not:cool");
+ BOOST_CHECK(zk->hasNode(path / "testhost"));
+ BOOST_CHECK(zk->hasNode(path / "testhost2"));
+ BOOST_CHECK(zk->hasNode(path / "testhost3"));
+ BOOST_CHECK(zk->getChildren(path / "testhost").size() == 0);
+ BOOST_CHECK(zk->getChildren(path / "testhost2").size() == 1);
+ BOOST_CHECK(zk->getChildren(path / "testhost3").size() == 1);
+
+ model->cleanDeployedFilesToDownload({"testhost2"}, "myapp:so:cool");
+ model->removeDeploymentsThatHaveDifferentApplicationId({"testhost2"}, "myapp:so:cool");
+
+ BOOST_CHECK(!zk->hasNode(path / "testhost"));
+ BOOST_CHECK(zk->hasNode(path / "testhost2"));
+ BOOST_CHECK(zk->hasNode(path / "testhost3"));
+ BOOST_CHECK(zk->getChildren(path / "testhost2").size() == 1);
+ BOOST_CHECK(zk->getChildren(path / "testhost3").size() == 1);
+
+ model->cleanDeployedFilesToDownload({"testhost2"}, "yourapp:so:cool");
+ BOOST_CHECK(!zk->hasNode(path / "testhost"));
+ BOOST_CHECK(zk->hasNode(path / "testhost2"));
+ BOOST_CHECK(!zk->hasNode(path / "testhost3"));
+}
+
+BOOST_AUTO_TEST_SUITE_END()
diff --git a/filedistribution/src/vespa/.gitignore b/filedistribution/src/vespa/.gitignore
new file mode 100644
index 00000000000..a477b4a6e5f
--- /dev/null
+++ b/filedistribution/src/vespa/.gitignore
@@ -0,0 +1,3 @@
+Makefile
+config-*.h
+config-*.cpp
diff --git a/filedistribution/src/vespa/filedistribution/common/CMakeLists.txt b/filedistribution/src/vespa/filedistribution/common/CMakeLists.txt
new file mode 100644
index 00000000000..ce692e6c7cc
--- /dev/null
+++ b/filedistribution/src/vespa/filedistribution/common/CMakeLists.txt
@@ -0,0 +1,19 @@
+# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_library(filedistribution_exceptionrethrower OBJECT
+ SOURCES
+ exception.cpp
+ DEPENDS
+)
+vespa_add_library(filedistribution_common STATIC
+ SOURCES
+ componentsdeleter.cpp
+ exception.cpp
+ vespa_logfwd.cpp
+ DEPENDS
+)
+target_compile_options(filedistribution_common PRIVATE -DTORRENT_DEBUG -DTORRENT_DISABLE_ENCRYPTION -DTORRENT_DISABLE_DHT -DWITH_SHIPPED_GEOIP_H -DBOOST_ASIO_HASH_MAP_BUCKETS=1021 -DBOOST_EXCEPTION_DISABLE -DBOOST_ASIO_ENABLE_CANCELIO -DBOOST_ASIO_DYN_LINK -DTORRENT_LINKING_SHARED)
+vespa_add_target_system_dependency(filedistribution_common boost boost_system-mt-d)
+vespa_add_target_system_dependency(filedistribution_common boost boost_thread-mt-d)
+vespa_add_target_system_dependency(filedistribution_common boost boost_program_options-mt-d)
+vespa_add_target_system_dependency(filedistribution_common boost boost_filesystem-mt-d)
+vespa_add_target_system_dependency(filedistribution_common boost boost_unit_test_framework-mt-d)
diff --git a/filedistribution/src/vespa/filedistribution/common/buffer.h b/filedistribution/src/vespa/filedistribution/common/buffer.h
new file mode 100644
index 00000000000..0c0692798ac
--- /dev/null
+++ b/filedistribution/src/vespa/filedistribution/common/buffer.h
@@ -0,0 +1,151 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <boost/noncopyable.hpp>
+#include <algorithm>
+
+namespace filedistribution {
+
+struct USED_FOR_MOVING {};
+
+template <class T>
+class Move {
+ mutable T _holder;
+public:
+ Move(T& toMove)
+ :_holder(USED_FOR_MOVING())
+ {
+ _holder.swap(toMove);
+ }
+
+ Move(const Move& other)
+ :_holder(USED_FOR_MOVING())
+ {
+ _holder.swap(other._holder);
+ }
+
+ void swap(T& t) const {
+ _holder.swap(t);
+ }
+
+private:
+ Move& operator=(const Move&);
+};
+
+template <class T>
+inline Move<T> move(T& t) {
+ return Move<T>(t);
+}
+
+class Buffer : boost::noncopyable {
+ size_t _capacity;
+ char* _buf;
+ size_t _size;
+public:
+ typedef char value_type;
+ typedef const value_type& const_reference;
+ typedef char* iterator;
+ typedef const char* const_iterator;
+
+ explicit Buffer(size_t capacityArg)
+ :_capacity(capacityArg),
+ _buf( new char[_capacity] ),
+ _size(0)
+ {}
+
+ Buffer(const Move<Buffer>& buffer);
+
+ explicit Buffer(USED_FOR_MOVING)
+ :_capacity(0),
+ _buf(0),
+ _size(0)
+ {}
+
+ template <typename ITER>
+ Buffer(ITER beginIter, ITER endIter)
+ : _capacity(endIter-beginIter),
+ _buf( new char[_capacity] ),
+ _size(_capacity)
+ {
+ std::copy(beginIter, endIter, begin());
+ }
+
+ ~Buffer() {
+ delete[] _buf;
+ }
+
+ size_t capacity() const {
+ return _capacity;
+ }
+
+ size_t size() const {
+ return _size;
+ }
+
+ //might expose uninitialized memory
+ void resize(size_t newSize) {
+ if ( newSize <= _capacity )
+ _size = newSize;
+ else {
+ reserve(newSize);
+ _size = newSize;
+ }
+ }
+
+ void reserve(size_t newCapacity) {
+ if ( newCapacity > _capacity ) {
+ Buffer buffer(newCapacity);
+ buffer._size = _size;
+ std::copy(begin(), end(), buffer.begin());
+ buffer.swap(*this);
+ }
+ }
+
+ void swap(Buffer& other) {
+ std::swap(_capacity, other._capacity);
+ std::swap(_buf, other._buf);
+ std::swap(_size, other._size);
+ }
+
+ void push_back(char c) {
+ if (_size == _capacity)
+ reserve(_capacity * 2);
+ _buf[_size++] = c;
+ }
+
+ iterator begin() {
+ return _buf;
+ }
+
+ iterator end() {
+ return _buf + _size;
+ }
+
+ const_iterator begin() const {
+ return _buf;
+ }
+
+ const_iterator end() const {
+ return _buf + _size;
+ }
+
+ char operator[](size_t i) const {
+ return _buf[i];
+ }
+
+ char& operator[](size_t i) {
+ return _buf[i];
+ }
+};
+
+inline Buffer::Buffer(const Move<Buffer>& buffer)
+ :_capacity(0),
+ _buf(0),
+ _size(0)
+{
+ buffer.swap(*this);
+}
+
+} //namespace filedistribution
+
+
diff --git a/filedistribution/src/vespa/filedistribution/common/componentsdeleter.cpp b/filedistribution/src/vespa/filedistribution/common/componentsdeleter.cpp
new file mode 100644
index 00000000000..74b36b77a72
--- /dev/null
+++ b/filedistribution/src/vespa/filedistribution/common/componentsdeleter.cpp
@@ -0,0 +1,89 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/fastos/fastos.h>
+#include "componentsdeleter.h"
+
+#include <vespa/log/log.h>
+LOG_SETUP(".componentsdeleter");
+
+#include <boost/foreach.hpp>
+
+
+using namespace filedistribution;
+
+struct ComponentsDeleter::Worker {
+ ComponentsDeleter& _parent;
+
+ Worker(ComponentsDeleter* parent)
+ :_parent(*parent) {}
+
+ void operator()();
+};
+
+
+void
+ComponentsDeleter::Worker::operator()()
+{
+ while (!boost::this_thread::interruption_requested()) {
+ try {
+ CallDeleteFun deleteFun = _parent._deleteRequests.pop();
+ boost::this_thread::disable_interruption di;
+ deleteFun();
+ } catch(const std::exception& e) {
+ LOG(error, e.what());
+ }
+ }
+}
+
+ComponentsDeleter::ComponentsDeleter()
+ :_deleterThread(Worker(this))
+{}
+
+ComponentsDeleter::~ComponentsDeleter()
+{
+ waitForAllComponentsDeleted();
+ _deleterThread.interrupt();
+ _deleterThread.join();
+}
+
+void
+ComponentsDeleter::waitForAllComponentsDeleted()
+{
+ LOG(debug, "Waiting for all components to be deleted");
+
+ for (int i=0; i<600 && !allComponentsDeleted(); ++i) {
+ boost::this_thread::sleep(boost::posix_time::milliseconds(100));
+ }
+ LOG(debug, "Done waiting for all components to be deleted");
+
+ logNotDeletedComponents();
+
+ if (!allComponentsDeleted())
+ kill(getpid(), SIGKILL);
+}
+
+bool
+ComponentsDeleter::allComponentsDeleted()
+{
+ LockGuard guard(_trackedComponentsMutex);
+ return _trackedComponents.empty();
+}
+
+void
+ComponentsDeleter::logNotDeletedComponents()
+{
+ LockGuard guard(_trackedComponentsMutex);
+ BOOST_FOREACH(TrackedComponentsMap::value_type component, _trackedComponents) {
+ LOG(info, "Timed out waiting for component '%s' to be deleted", component.second.c_str());
+ }
+}
+
+void
+ComponentsDeleter::removeFromTrackedComponents(void* component) {
+ LockGuard guard(_trackedComponentsMutex);
+ if (_trackedComponents.count(component))
+ LOG(debug, "Deleting '%s'", _trackedComponents[component].c_str());
+
+ size_t numErased = _trackedComponents.erase(component);
+ assert(numErased == 1);
+ (void) numErased;
+}
diff --git a/filedistribution/src/vespa/filedistribution/common/componentsdeleter.h b/filedistribution/src/vespa/filedistribution/common/componentsdeleter.h
new file mode 100644
index 00000000000..afb122b3a2f
--- /dev/null
+++ b/filedistribution/src/vespa/filedistribution/common/componentsdeleter.h
@@ -0,0 +1,71 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <map>
+#include <typeinfo>
+#include <string>
+
+#include <boost/function.hpp>
+#include <boost/bind.hpp>
+#include <boost/checked_delete.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/noncopyable.hpp>
+#include <boost/thread.hpp>
+
+#include "concurrentqueue.h"
+
+namespace filedistribution {
+/**
+ * Ensures that components are deleted in a separate thread,
+ * and that their lifetime is tracked.
+ *
+ * This prevents situations as e.g. deleting ZKFacade from a zookeeper watcher thread.
+ */
+class ComponentsDeleter : boost::noncopyable {
+ class Worker;
+ typedef boost::lock_guard<boost::mutex> LockGuard;
+
+ boost::mutex _trackedComponentsMutex;
+ typedef std::map<void*, std::string> TrackedComponentsMap;
+ TrackedComponentsMap _trackedComponents;
+
+ typedef boost::function<void (void)> CallDeleteFun;
+ ConcurrentQueue<CallDeleteFun> _deleteRequests;
+
+ boost::thread _deleterThread;
+
+ void removeFromTrackedComponents(void* component);
+
+ template <class T>
+ void deleteComponent(T* component) {
+ removeFromTrackedComponents(component);
+ delete component;
+ }
+
+ template <class T>
+ void requestDelete(T* component) {
+ _deleteRequests.push(boost::bind(&ComponentsDeleter::deleteComponent<T>, this, component));
+ }
+
+ void waitForAllComponentsDeleted();
+ bool allComponentsDeleted();
+ void logNotDeletedComponents();
+ public:
+ ComponentsDeleter();
+
+ /*
+ * Waits blocking for up to 60 seconds until all components are deleted.
+ * If it fails, the application is killed.
+ */
+ ~ComponentsDeleter();
+
+ template <class T>
+ boost::shared_ptr<T> track(T* t) {
+ LockGuard guard(_trackedComponentsMutex);
+
+ _trackedComponents[t] = typeid(t).name();
+ return boost::shared_ptr<T>(t, boost::bind(&ComponentsDeleter::requestDelete<T>, this, t));
+ }
+};
+}
+
diff --git a/filedistribution/src/vespa/filedistribution/common/concurrentqueue.h b/filedistribution/src/vespa/filedistribution/common/concurrentqueue.h
new file mode 100644
index 00000000000..056ba3153a2
--- /dev/null
+++ b/filedistribution/src/vespa/filedistribution/common/concurrentqueue.h
@@ -0,0 +1,53 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <queue>
+
+#include <boost/thread/condition_variable.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/thread/locks.hpp>
+
+namespace filedistribution {
+
+template <typename T>
+class ConcurrentQueue {
+public:
+ typedef T value_type;
+private:
+ boost::condition_variable _nonEmpty;
+
+ mutable boost::mutex _queueMutex;
+ typedef boost::unique_lock<boost::mutex> UniqueLock;
+
+ std::queue<value_type> _queue;
+
+public:
+ void push(const T& t) {
+ {
+ UniqueLock guard(_queueMutex);
+ _queue.push(t);
+ }
+ _nonEmpty.notify_one();
+ }
+
+ //Assumes that value_type has nonthrow copy constructors
+ const T pop() {
+ UniqueLock guard(_queueMutex);
+ while (_queue.empty()) {
+ _nonEmpty.wait(guard);
+ }
+ T result = _queue.front();
+ _queue.pop();
+ return result;
+ }
+
+ void clear() {
+ UniqueLock guard(_queueMutex);
+ while (!_queue.empty()) {
+ _queue.pop();
+ }
+ }
+};
+
+} //namespace filedistribution
+
diff --git a/filedistribution/src/vespa/filedistribution/common/exception.cpp b/filedistribution/src/vespa/filedistribution/common/exception.cpp
new file mode 100644
index 00000000000..7195a99d702
--- /dev/null
+++ b/filedistribution/src/vespa/filedistribution/common/exception.cpp
@@ -0,0 +1,24 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/fastos/fastos.h>
+#include "exception.h"
+
+#include <execinfo.h>
+
+std::ostream&
+filedistribution::operator<<(std::ostream& stream, const Backtrace& backtrace) {
+ char** strings = backtrace_symbols(
+ &*backtrace._frames.begin(), backtrace._size);
+
+ stream <<"Backtrace:" <<std::endl;
+ for (size_t i = 0; i<backtrace._size; ++i) {
+ stream <<strings[i] <<std::endl;
+ }
+
+ free(strings);
+ return stream;
+}
+
+
+filedistribution::Backtrace::Backtrace()
+ :_size(backtrace(&*_frames.begin(), _frames.size()))
+{}
diff --git a/filedistribution/src/vespa/filedistribution/common/exception.h b/filedistribution/src/vespa/filedistribution/common/exception.h
new file mode 100644
index 00000000000..1d15253f883
--- /dev/null
+++ b/filedistribution/src/vespa/filedistribution/common/exception.h
@@ -0,0 +1,65 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <boost/exception/all.hpp>
+#include <boost/current_function.hpp>
+#include <boost/exception/all.hpp>
+#include <boost/array.hpp>
+#include <boost/version.hpp>
+
+namespace filedistribution {
+
+class Backtrace {
+ static const size_t _maxBacktraceSize = 200;
+ public:
+ boost::array<void*, _maxBacktraceSize> _frames;
+ const size_t _size;
+
+ Backtrace();
+};
+
+
+std::ostream& operator<<(std::ostream& stream, const Backtrace& backtrace);
+
+namespace errorinfo {
+typedef boost::error_info<struct tag_Backtrace, Backtrace> Backtrace;
+typedef boost::error_info<struct tag_UserMessage, Backtrace> ExplanationForUser;
+}
+
+//Exceptions should inherit virtually from boost and std exception,
+//see http://www.boost.org/doc/libs/1_39_0/libs/exception/doc/using_virtual_inheritance_in_exception_types.html
+struct Exception : virtual boost::exception, virtual std::exception {
+ Exception() {
+ *this << errorinfo::Backtrace(Backtrace());
+ }
+};
+
+} //namespace filedistribution
+
+#if BOOST_VERSION < 103700
+#define BOOST_THROW_EXCEPTION(x)\
+ ::boost::throw_exception( ::boost::enable_error_info(x) << \
+ ::boost::throw_function(BOOST_CURRENT_FUNCTION) << \
+ ::boost::throw_file(__FILE__) << \
+ ::boost::throw_line((int)__LINE__) )
+
+#endif
+
+
+//********** Begin: Please remove when fixed upstream.
+//boost 1.36 & 1.37 bugfix: allow attaching a boost::filesytem::path to a boost::exception
+//using the error info mechanism.
+#include <boost/filesystem/path.hpp>
+
+namespace boost{
+namespace to_string_detail {
+std::basic_ostream<boost::filesystem::path::string_type::value_type,
+ boost::filesystem::path::string_type::traits_type > &
+operator<<
+( std::basic_ostream<boost::filesystem::path::string_type::value_type,
+ boost::filesystem::path::string_type::traits_type >& os, const boost::filesystem::path & ph );
+}
+}
+
+//********** End: Please remove when fixed upstream.
+
diff --git a/filedistribution/src/vespa/filedistribution/common/exceptionrethrower.h b/filedistribution/src/vespa/filedistribution/common/exceptionrethrower.h
new file mode 100644
index 00000000000..28b45546a64
--- /dev/null
+++ b/filedistribution/src/vespa/filedistribution/common/exceptionrethrower.h
@@ -0,0 +1,47 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <boost/thread/mutex.hpp>
+#include <boost/thread/locks.hpp>
+#include <boost/exception_ptr.hpp>
+#include <boost/type_traits/is_polymorphic.hpp>
+
+namespace filedistribution {
+
+//used for rethrowing an exceptions in a different context
+class ExceptionRethrower {
+ boost::exception_ptr _exceptionPtr; //not a pod, default constructed to null value
+
+ mutable boost::mutex _exceptionMutex;
+ typedef boost::lock_guard<boost::mutex> LockGuard;
+
+public:
+ void rethrow() const {
+ LockGuard guard(_exceptionMutex);
+
+ if (_exceptionPtr)
+ boost::rethrow_exception(_exceptionPtr);
+ }
+
+ bool exceptionStored() const {
+ LockGuard guard(_exceptionMutex);
+ return _exceptionPtr;
+ }
+
+ template <class T>
+ void store(const T& exception) {
+ boost::exception_ptr exceptionPtr = boost::copy_exception(exception);
+ store(exceptionPtr);
+ }
+
+ void store(const boost::exception_ptr exceptionPtr) {
+ LockGuard guard(_exceptionMutex);
+
+ if (!_exceptionPtr) //only store the first exception to be rethrowed.
+ _exceptionPtr = exceptionPtr;
+ }
+};
+
+} //namespace filedistribution
+
+
diff --git a/filedistribution/src/vespa/filedistribution/common/logfwd.h b/filedistribution/src/vespa/filedistribution/common/logfwd.h
new file mode 100644
index 00000000000..1a2d7a559bf
--- /dev/null
+++ b/filedistribution/src/vespa/filedistribution/common/logfwd.h
@@ -0,0 +1,20 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+namespace filedistribution {
+
+/** To avoid requiring vespa log from the jni library*/
+namespace logfwd {
+
+enum LogLevel { debug, error, warning, info };
+
+void log(LogLevel level, const char *file, int line, const char *fmt, ...)
+ __attribute__((format(printf,4,5)));
+
+} //namespace logfwd
+} //namespace filedistribution
+
+#define LOGFWD(level, ...) filedistribution::logfwd::log(filedistribution::logfwd::level, \
+ __FILE__, __LINE__, __VA_ARGS__)
+
+
diff --git a/filedistribution/src/vespa/filedistribution/common/vespa_logfwd.cpp b/filedistribution/src/vespa/filedistribution/common/vespa_logfwd.cpp
new file mode 100644
index 00000000000..ab2a2334434
--- /dev/null
+++ b/filedistribution/src/vespa/filedistribution/common/vespa_logfwd.cpp
@@ -0,0 +1,47 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/fastos/fastos.h>
+#include <stdarg.h>
+#include <boost/scoped_array.hpp>
+
+#include "logfwd.h"
+
+#include <vespa/log/log.h>
+
+LOG_SETUP(".common.model");
+
+
+using ns_log::Logger;
+
+namespace {
+ Logger::LogLevel toVespaLogLevel(filedistribution::logfwd::LogLevel level) {
+ namespace l = filedistribution::logfwd;
+
+ switch (level) {
+ case l::info: return Logger::info;
+ case l::debug: return Logger::debug;
+ case l::error: return Logger::error;
+ case l::warning: return Logger::warning;
+ default:
+ LOG(error, "Unknown log level, falling back to error");
+ return Logger::error;
+ }
+ }
+}
+
+void filedistribution::logfwd::log(LogLevel level, const char* file, int line, const char* fmt, ...)
+{
+
+ Logger::LogLevel vespaLogLevel = toVespaLogLevel(level);
+
+ if (logger.wants(vespaLogLevel)) {
+ const size_t maxSize(0x8000);
+ boost::scoped_array<char> payload(new char[maxSize]);
+
+ va_list args;
+ va_start(args, fmt);
+ vsnprintf(payload.get(), maxSize, fmt, args);
+ va_end(args);
+
+ logger.doLog(vespaLogLevel, file, line, payload.get());
+ }
+}
diff --git a/filedistribution/src/vespa/filedistribution/distributor/CMakeLists.txt b/filedistribution/src/vespa/filedistribution/distributor/CMakeLists.txt
new file mode 100644
index 00000000000..1e1baf01e61
--- /dev/null
+++ b/filedistribution/src/vespa/filedistribution/distributor/CMakeLists.txt
@@ -0,0 +1,15 @@
+# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_library(filedistribution_distributor STATIC
+ SOURCES
+ filedistributortrackerimpl.cpp
+ filedownloader.cpp
+ filedownloadermanager.cpp
+ hostname.cpp
+ scheduler.cpp
+ signalhandling.cpp
+ state_server_impl.cpp
+ DEPENDS
+)
+target_compile_options(filedistribution_distributor PRIVATE -DTORRENT_DEBUG -DTORRENT_DISABLE_ENCRYPTION -DTORRENT_DISABLE_DHT -DWITH_SHIPPED_GEOIP_H -DBOOST_ASIO_HASH_MAP_BUCKETS=1021 -DBOOST_EXCEPTION_DISABLE -DBOOST_ASIO_ENABLE_CANCELIO -DBOOST_ASIO_DYN_LINK -DTORRENT_LINKING_SHARED)
+vespa_generate_config(filedistribution_distributor filedistributor.def)
+install(FILES filedistributor.def DESTINATION var/db/vespa/config_server/serverdb/classes)
diff --git a/filedistribution/src/vespa/filedistribution/distributor/filedistributor.def b/filedistribution/src/vespa/filedistribution/distributor/filedistributor.def
new file mode 100644
index 00000000000..654a98de5c4
--- /dev/null
+++ b/filedistribution/src/vespa/filedistribution/distributor/filedistributor.def
@@ -0,0 +1,10 @@
+# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+namespace=cloud.config.filedistribution
+
+torrentport int
+stateport int default = 0
+hostname string
+filedbpath string
+
+maxdownloadspeed double
+maxuploadspeed double
diff --git a/filedistribution/src/vespa/filedistribution/distributor/filedistributortrackerimpl.cpp b/filedistribution/src/vespa/filedistribution/distributor/filedistributortrackerimpl.cpp
new file mode 100644
index 00000000000..4b6fbc96cb3
--- /dev/null
+++ b/filedistribution/src/vespa/filedistribution/distributor/filedistributortrackerimpl.cpp
@@ -0,0 +1,203 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/fastos/fastos.h>
+#include "filedistributortrackerimpl.h"
+
+#include <cmath>
+
+#include <libtorrent/tracker_manager.hpp>
+#include <libtorrent/torrent.hpp>
+
+#include <vespa/filedistribution/model/filedistributionmodel.h>
+#include "filedownloader.h"
+#include "hostname.h"
+
+#include <vespa/log/log.h>
+LOG_SETUP(".filedistributiontrackerimpl");
+
+using filedistribution::FileDistributorTrackerImpl;
+using filedistribution::FileDownloader;
+using filedistribution::FileDistributionModel;
+using filedistribution::Scheduler;
+using filedistribution::ExceptionRethrower;
+
+typedef FileDistributionModel::PeerEntries PeerEntries;
+
+namespace asio = boost::asio;
+
+namespace {
+
+void
+filterSelf(FileDistributionModel::PeerEntries& peers,
+ const std::string& hostName,
+ int port)
+{
+ FileDistributionModel::PeerEntries::iterator
+ i = peers.begin(),
+ currEnd = peers.end();
+
+ while ( i != currEnd ) {
+ //hostName is currently used in the ip field
+ if (i->ip == hostName && i->port == port) {
+ --currEnd;
+ std::swap(*i, *currEnd);
+ } else {
+ ++i;
+ }
+ }
+
+ peers.erase(currEnd, peers.end());
+}
+
+void resolveIPAddresses(PeerEntries& peers) {
+ for (auto& p: peers) {
+ try {
+ p.ip = filedistribution::lookupIPAddress(p.ip);
+ } catch (filedistribution::FailedResolvingHostName& e) {
+ LOG(info, "Failed resolving address %s", p.ip.c_str());
+ }
+ }
+}
+
+struct TrackingTask : public Scheduler::Task {
+ int _numTimesRescheduled;
+
+ libtorrent::tracker_request _trackerRequest;
+ boost::weak_ptr<libtorrent::torrent> _torrent;
+ boost::weak_ptr<FileDownloader> _downloader;
+ boost::shared_ptr<FileDistributionModel> _model;
+
+ TrackingTask(Scheduler& scheduler,
+ const libtorrent::tracker_request& trackerRequest,
+ const boost::shared_ptr<libtorrent::torrent>& torrent,
+ const boost::weak_ptr<FileDownloader>& downloader,
+ const boost::shared_ptr<FileDistributionModel>& model)
+ : Task(scheduler),
+ _numTimesRescheduled(0),
+ _trackerRequest(trackerRequest),
+ _torrent(torrent),
+ _downloader(downloader),
+ _model(model)
+ {}
+
+ //TODO: refactor
+ void doHandle() {
+ if (boost::shared_ptr<FileDownloader> downloader = _downloader.lock()) {
+ //All torrents must be destructed before the session is destructed.
+ //It's okay to prevent the torrent from expiring here
+ //since the session can't be destructed while
+ //we hold a shared_ptr to the downloader.
+ if (boost::shared_ptr<libtorrent::torrent> torrent = _torrent.lock()) {
+ PeerEntries peers = getPeers(downloader);
+
+ if (!peers.empty()) {
+ torrent->session().m_io_service.dispatch(
+ [torrent_weak_ptr = _torrent, trackerRequest = _trackerRequest, peers = peers]() mutable {
+ if (auto torrent_sp = torrent_weak_ptr.lock()) {
+ torrent_sp->tracker_response(
+ trackerRequest,
+ libtorrent::address(),
+ std::list<libtorrent::address>(),
+ peers,
+ -1, -1, -1, -1, -1,
+ libtorrent::address(), "trackerid");
+ }
+ });
+ }
+
+ if (peers.size() < 5) {
+ reschedule();
+ }
+ }
+ }
+ }
+
+ PeerEntries getPeers(const boost::shared_ptr<FileDownloader>& downloader) {
+ std::string fileReference = downloader->infoHash2FileReference(_trackerRequest.info_hash);
+
+ const size_t recommendedMaxNumberOfPeers = 30;
+ PeerEntries peers = _model->getPeers(fileReference, recommendedMaxNumberOfPeers);
+
+ //currently, libtorrent stops working if it tries to connect to itself.
+ filterSelf(peers, downloader->_hostName, downloader->_port);
+ resolveIPAddresses(peers);
+ for (const auto& peer: peers) {
+ LOG(debug, "Returning peer with ip %s", peer.ip.c_str());
+ }
+
+ return peers;
+ }
+
+ void reschedule() {
+ if (_numTimesRescheduled < 5) {
+ double fudgeFactor = 0.1;
+ schedule(boost::posix_time::seconds(static_cast<int>(
+ std::pow(3., _numTimesRescheduled) + fudgeFactor)));
+ _numTimesRescheduled++;
+ }
+ }
+};
+
+
+void
+workerFunction(boost::shared_ptr<ExceptionRethrower> exceptionRethrower, asio::io_service& ioService)
+{
+ while (!boost::this_thread::interruption_requested()) {
+ try {
+ //No reset needed after handling exceptions.
+ ioService.run();
+ } catch(const boost::thread_interrupted&) {
+ LOG(debug, "Tracker worker thread interrupted.");
+ throw;
+ } catch(...) {
+ exceptionRethrower->store(boost::current_exception());
+ }
+ }
+}
+
+} //anonymous namespace
+
+
+FileDistributorTrackerImpl::FileDistributorTrackerImpl(
+ const boost::shared_ptr<FileDistributionModel>& model,
+ const boost::shared_ptr<ExceptionRethrower>& exceptionRethrower)
+ :_exceptionRethrower(exceptionRethrower),
+ _model(model)
+{}
+
+
+FileDistributorTrackerImpl::~FileDistributorTrackerImpl() {
+ LOG(debug, "Deconstructing FileDistributorTrackerImpl");
+
+ LockGuard guard(_mutex);
+ _scheduler.reset();
+}
+
+
+void
+FileDistributorTrackerImpl::trackingRequest(
+ libtorrent::tracker_request& request,
+ const boost::shared_ptr<libtorrent::torrent> & torrent)
+{
+ LockGuard guard(_mutex);
+
+ if (torrent != boost::shared_ptr<libtorrent::torrent>()) {
+ boost::shared_ptr<TrackingTask> trackingTask(new TrackingTask(
+ *_scheduler.get(), request, torrent, _downloader, _model));
+
+ trackingTask->scheduleNow();
+ }
+}
+
+
+void
+FileDistributorTrackerImpl::setDownloader(const boost::shared_ptr<FileDownloader>& downloader)
+{
+ LockGuard guard(_mutex);
+
+ _scheduler.reset();
+ _downloader = downloader;
+
+ if (downloader) {
+ _scheduler.reset(new Scheduler(boost::bind(&workerFunction, _exceptionRethrower, _1)));
+ }
+}
diff --git a/filedistribution/src/vespa/filedistribution/distributor/filedistributortrackerimpl.h b/filedistribution/src/vespa/filedistribution/distributor/filedistributortrackerimpl.h
new file mode 100644
index 00000000000..edbdb9b8943
--- /dev/null
+++ b/filedistribution/src/vespa/filedistribution/distributor/filedistributortrackerimpl.h
@@ -0,0 +1,45 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <libtorrent/session.hpp>
+#include <libtorrent/torrent.hpp>
+
+#include <boost/thread.hpp>
+#include <boost/shared_ptr.hpp>
+#include <boost/asio/io_service.hpp>
+#include <boost/asio/deadline_timer.hpp>
+
+#include <vespa/filedistribution/model/filedistributionmodel.h>
+#include <vespa/filedistribution/common/exceptionrethrower.h>
+#include "scheduler.h"
+
+namespace filedistribution {
+class FileDistributionModel;
+class FileDownloader;
+
+class FileDistributorTrackerImpl : public FileDistributionTracker {
+ const boost::shared_ptr<ExceptionRethrower> _exceptionRethrower;
+ const boost::shared_ptr<FileDistributionModel> _model;
+
+ typedef boost::lock_guard<boost::mutex> LockGuard;
+ boost::mutex _mutex;
+ boost::weak_ptr<FileDownloader> _downloader;
+
+ //Use separate worker thread to avoid potential deadlock
+ //between tracker requests and files to download changed requests.
+ boost::scoped_ptr<Scheduler> _scheduler;
+public:
+ FileDistributorTrackerImpl(const boost::shared_ptr<FileDistributionModel>& model,
+ const boost::shared_ptr<ExceptionRethrower>& exceptionRethrower);
+
+ virtual ~FileDistributorTrackerImpl();
+
+ //overrides
+ void trackingRequest(libtorrent::tracker_request& request,
+ const boost::shared_ptr<libtorrent::torrent> & torrent);
+
+ void setDownloader(const boost::shared_ptr<FileDownloader>& downloader);
+};
+
+} //namespace filedistribution
+
diff --git a/filedistribution/src/vespa/filedistribution/distributor/filedownloader.cpp b/filedistribution/src/vespa/filedistribution/distributor/filedownloader.cpp
new file mode 100644
index 00000000000..57fae9df6ee
--- /dev/null
+++ b/filedistribution/src/vespa/filedistribution/distributor/filedownloader.cpp
@@ -0,0 +1,437 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/fastos/fastos.h>
+#include "filedownloader.h"
+#include "hostname.h"
+
+#include <iterator>
+#include <algorithm>
+
+#include <boost/filesystem.hpp>
+#include <boost/filesystem/fstream.hpp>
+#include <boost/filesystem/convenience.hpp>
+#include <boost/lambda/lambda.hpp>
+#include <boost/lambda/bind.hpp>
+#include <boost/function_output_iterator.hpp>
+#include <boost/foreach.hpp>
+#include <boost/thread.hpp>
+
+#include <libtorrent/alert.hpp>
+#include <libtorrent/alert_types.hpp>
+#include <libtorrent/torrent_handle.hpp>
+#include <libtorrent/bencode.hpp>
+
+#include <vespa/log/log.h>
+LOG_SETUP(".filedownloader");
+
+using filedistribution::FileDownloader;
+namespace fs = boost::filesystem;
+
+using libtorrent::sha1_hash;
+using libtorrent::torrent_handle;
+
+namespace {
+const std::string resumeDataSuffix = ".resume";
+const std::string resumeDataSuffixTemp = ".resumetemp";
+const std::string newFileSuffix = ".new";
+
+//TODO: temporarily duplicated from filedistributionmanager
+std::string
+fileReferenceToString(const libtorrent::sha1_hash& fileReference) {
+ std::ostringstream fileReferenceString;
+ fileReferenceString <<fileReference;
+
+ assert (fileReferenceString.str().size() == 40);
+ return fileReferenceString.str();
+}
+
+libtorrent::sha1_hash
+toInfoHash(const std::string& fileReference) {
+ assert (fileReference.size() == 40);
+ std::istringstream s(fileReference);
+
+ sha1_hash infoHash;
+ s >> infoHash;
+ return infoHash;
+}
+
+void
+addNewFile(const fs::path& dbPath, const fs::path& newFile) {
+ LOG(debug, "Adding new file: '%s'.", newFile.string().c_str());
+ const fs::path destination = dbPath / newFile.stem();
+
+ if ( fs::exists(destination) )
+ fs::remove_all(destination);
+
+ fs::path resumeData = destination.string() + resumeDataSuffix;
+ if ( fs::exists(resumeData) )
+ fs::remove(resumeData);
+
+ fs::rename(newFile, destination);
+}
+
+void
+addNewDbFiles(const fs::path& dbPath) {
+ for (fs::directory_iterator i(dbPath), end; i != end; ++i) {
+ if (newFileSuffix == fs::extension(*i))
+ addNewFile(dbPath, *i);
+ }
+}
+
+fs::path
+resumeDataPath(const libtorrent::torrent_handle& torrent) {
+#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
+ fs::path tmp(torrent.save_path());
+#pragma GCC diagnostic pop
+ return tmp.string() + resumeDataSuffix;
+}
+
+fs::path
+resumeDataPathTemp(const libtorrent::torrent_handle& torrent) {
+#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
+ fs::path tmp(torrent.save_path());
+#pragma GCC diagnostic pop
+ return tmp.string() + resumeDataSuffixTemp;
+}
+
+fs::path
+getMainFile(const libtorrent::torrent_handle& handle) {
+ const libtorrent::torrent_info fallback(handle.info_hash());
+ auto p = handle.torrent_file();
+ const libtorrent::torrent_info& info = (p ? *p : fallback);
+ return info.files().num_files() == 1 ?
+ info.file_at(0).path :
+ info.name();
+}
+
+std::string
+getMainName(const libtorrent::torrent_handle& handle) {
+ const libtorrent::torrent_info fallback(handle.info_hash());
+ auto p = handle.torrent_file();
+ const libtorrent::torrent_info& info = (p ? *p : fallback);
+ return info.name();
+}
+
+libtorrent::session_settings
+createSessionSettings() {
+ libtorrent::session_settings s;
+
+ const int unlimited = -1;
+ s.active_downloads = s.active_seeds = s.active_limit = unlimited;
+
+ s.min_reconnect_time = 1; //seconds
+ s.min_announce_interval = 5 * 60; //seconds
+ return s;
+}
+
+} //anonymous namespace
+
+struct FileDownloader::EventHandler
+{
+ FileDownloader& _fileDownloader;
+
+ EventHandler(FileDownloader* fileDownloader)
+ : _fileDownloader(*fileDownloader)
+ {}
+
+ void defaultHandler(const libtorrent::alert& alert) const {
+ LOG(debug, "alert %s: %s", alert.what(), alert.message().c_str());
+ }
+
+ void operator()(const libtorrent::listen_failed_alert& alert) const {
+ BOOST_THROW_EXCEPTION(std::runtime_error(alert.message()));
+ }
+ void operator()(const libtorrent::fastresume_rejected_alert& alert) const {
+ LOG(info, "alert %s: %s", alert.what(), alert.message().c_str());
+ }
+ void operator()(const libtorrent::torrent_delete_failed_alert& alert) const {
+ LOG(warning, "alert %s: %s", alert.what(), alert.message().c_str());
+ }
+ void operator()(const libtorrent::file_error_alert& alert) const {
+ LOG(error, "alert %s: %s", alert.what(), alert.message().c_str());
+ }
+
+ void operator()(const libtorrent::torrent_finished_alert& alert) const {
+ defaultHandler(alert);
+
+ std::string fileReference = fileReferenceToString(alert.handle.info_hash());
+
+ LOG(debug, "File '%s' with file reference '%s' downloaded successfully.",
+ getMainName(alert.handle).c_str(),
+ fileReference.c_str());
+
+ _fileDownloader.signalIfFinishedDownloading(fileReference);
+ alert.handle.save_resume_data();
+ _fileDownloader.didRequestSRD();
+ }
+
+ void operator()(const libtorrent::save_resume_data_failed_alert& alert) const {
+ LOG(warning, "save resume data failed: %s -- %s",
+ alert.what(), alert.message().c_str());
+ _fileDownloader.didReceiveSRD();
+ }
+
+ void operator()(const libtorrent::save_resume_data_alert& alert) const {
+ defaultHandler(alert);
+
+ fs::ofstream resumeFile(resumeDataPathTemp(alert.handle),
+ std::ios_base::binary);
+ resumeFile.unsetf(std::ios_base::skipws);
+ libtorrent::bencode(std::ostream_iterator<char>(resumeFile),
+ *alert.resume_data);
+ resumeFile.close();
+ fs::rename(resumeDataPathTemp(alert.handle), resumeDataPath(alert.handle));
+ _fileDownloader.didReceiveSRD();
+ }
+
+ void handle(std::unique_ptr<libtorrent::alert> alert) {
+ try {
+ libtorrent::handle_alert<
+ libtorrent::torrent_finished_alert,
+ libtorrent::save_resume_data_alert,
+ libtorrent::save_resume_data_failed_alert,
+ libtorrent::listen_failed_alert,
+ libtorrent::file_error_alert,
+ libtorrent::fastresume_rejected_alert,
+ libtorrent::torrent_delete_failed_alert>
+ dispatch(alert, *this);
+ } catch (libtorrent::unhandled_alert& e) {
+ LOG(debug, "alert (ignored): %s -- %s",
+ alert->what(), alert->message().c_str());
+ }
+ }
+};
+
+FileDownloader::LogSessionDeconstructed::~LogSessionDeconstructed()
+{
+ LOG(debug, "Libtorrent session closed successfully.");
+}
+
+FileDownloader::FileDownloader(const boost::shared_ptr<FileDistributionTracker>& tracker,
+ const std::string& hostName, int port,
+ const fs::path& dbPath,
+ const boost::shared_ptr<ExceptionRethrower>& exceptionRethrower)
+ : _outstanding_SRD_requests(0),
+ _tracker(tracker),
+ _session(tracker.get(), libtorrent::fingerprint("vp", 0, 0, 0, 0), 0),
+ _dbPath(dbPath),
+ _exceptionRethrower(exceptionRethrower),
+ _hostName(hostName),
+ _port(port)
+{
+ if (!fs::exists(_dbPath))
+ fs::create_directories(_dbPath);
+ addNewDbFiles(_dbPath);
+
+ _session.set_settings(createSessionSettings());
+ listen();
+ _session.set_alert_mask(
+ libtorrent::alert::error_notification |
+ libtorrent::alert::status_notification);
+
+}
+
+FileDownloader::~FileDownloader() {
+ EventHandler eventHandler(this);
+ size_t cnt = 0;
+ do {
+ LOG(debug, "destructor waiting for %zu SRD alerts", _outstanding_SRD_requests);
+ while (_session.wait_for_alert(libtorrent::milliseconds(20))) {
+ std::unique_ptr<libtorrent::alert> alert = _session.pop_alert();
+ eventHandler.handle(std::move(alert));
+ ++cnt;
+ }
+ } while (_outstanding_SRD_requests > 0);
+ LOG(debug, "handled %zu alerts in destructor", cnt);
+}
+
+void
+FileDownloader::listen() {
+ for (int retries = 0; retries < 5; ++retries) {
+ boost::system::error_code ec;
+ _session.listen_on(std::make_pair(_port, _port), ec); // (min, max)
+ //If libtorrent fails listening on the specified port,
+ //it will automatically try to use port 0.
+ if (!ec && _session.listen_port() == _port)
+ return;
+ perror("Listen failed");
+ LOG(debug, "Failed listening on '%d' message='%s'", _port, ec.message().c_str());
+ boost::this_thread::sleep(boost::posix_time::milliseconds(500));
+ }
+
+ BOOST_THROW_EXCEPTION(FailedListeningException(_hostName, _port));
+}
+
+boost::optional< fs::path >
+FileDownloader::pathToCompletedFile(const std::string& fileReference) const {
+ libtorrent::torrent_handle torrent = _session.find_torrent(toInfoHash(fileReference));
+
+#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
+ if (torrent.is_valid() && torrent.is_finished()) {
+#pragma GCC diagnostic pop
+ return _dbPath / fileReference / getMainFile(torrent);
+ } else {
+ return boost::optional< fs::path>();
+ }
+}
+
+
+boost::optional<FileDownloader::ResumeDataBuffer>
+FileDownloader::getResumeData(const std::string& fileReference) {
+ LOG(debug, ("Reading resume data for " + fileReference).c_str());
+ try {
+ fs::path path = (_dbPath / fileReference).string() + resumeDataSuffix;
+ if (fs::exists(path)) {
+ fs::ifstream file(path, std::ios::binary);
+ ResumeDataBuffer result;
+
+ std::istream_iterator<char> iterator(file), end;
+ std::copy(iterator, end, std::back_inserter(result));
+ LOG(debug, ("Successfully retrieved resume data for " + fileReference).c_str());
+ if (result.size() < 50) {
+ LOG(info, "Very small resume file %zu bytes.", result.size());
+ }
+
+ return result;
+ }
+ } catch(...) {
+ //resume data is only an optimization
+ LOG(info, ("Error while reading resume data for " + fileReference).c_str());
+ }
+ return boost::optional<ResumeDataBuffer>();
+}
+
+
+bool
+FileDownloader::hasTorrent(const std::string& fileReference) const {
+ return _session.find_torrent(toInfoHash(fileReference)).is_valid();
+}
+
+void
+FileDownloader::addTorrent(const std::string& fileReference, const Buffer& buffer) {
+ LockGuard guard(_modifyTorrentsDownloadingMutex);
+
+ boost::optional<ResumeDataBuffer> resumeData = getResumeData(fileReference);
+
+ if (_session.find_torrent( (toInfoHash(fileReference))).is_valid())
+ return;
+
+ libtorrent::lazy_entry entry;
+#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
+ libtorrent::lazy_bdecode(&*buffer.begin(), &*buffer.end(),
+ entry); //out
+#pragma GCC diagnostic pop
+
+ libtorrent::add_torrent_params torrentParams;
+ torrentParams.save_path = (_dbPath / fileReference).string();
+ torrentParams.ti = new libtorrent::torrent_info(entry);
+
+ torrentParams.auto_managed = false;
+ torrentParams.paused = false;
+
+ if (resumeData)
+ torrentParams.resume_data = *resumeData; //vector will be swapped
+
+ libtorrent::torrent_handle torrentHandle = _session.add_torrent(torrentParams);
+
+ LOG(debug, "Started downloading file '%s' with file reference '%s'.",
+ getMainName(torrentHandle).c_str(), fileReference.c_str());
+}
+
+
+void
+FileDownloader::deleteTorrentData(const libtorrent::torrent_handle& torrent, LockGuard&) {
+ if (torrent.is_valid()) {
+ fs::path resumeFilePath = resumeDataPath(torrent);
+
+#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
+ fs::path savePath = torrent.save_path();
+#pragma GCC diagnostic pop
+
+ //the files might not exist, so ignore return value
+ fs::remove_all(savePath);
+ fs::remove(resumeFilePath);
+ }
+
+ _downloadFailed(fileReferenceToString(torrent.info_hash()), FileProvider::FileReferenceRemoved);
+}
+
+void
+FileDownloader::removeAllTorrentsBut(const std::set<std::string> & filesToRetain) {
+ LockGuard guard(_modifyTorrentsDownloadingMutex);
+
+ std::set<std::string> currentFiles;
+ namespace ll = boost::lambda;
+
+ std::set<sha1_hash> infoHashesToRetain;
+ BOOST_FOREACH(const std::string& fileReference, filesToRetain) {
+ infoHashesToRetain.insert(toInfoHash(fileReference));
+ }
+
+ std::vector<torrent_handle> torrents = _session.get_torrents();
+
+ BOOST_FOREACH(torrent_handle torrent, torrents) {
+ if (!infoHashesToRetain.count(torrent.info_hash())) {
+ LOG(info, "Removing torrent: '%s' with file reference '%s'",
+ getMainName(torrent).c_str(),
+ fileReferenceToString(torrent.info_hash()).c_str());
+
+ deleteTorrentData(torrent, guard);
+ _session.remove_torrent(torrent);
+ }
+ }
+}
+
+
+void FileDownloader::runEventLoop() {
+ EventHandler eventHandler(this);
+ try {
+ while (!boost::this_thread::interruption_requested()) {
+ if (_session.wait_for_alert(libtorrent::milliseconds(100))) {
+ std::unique_ptr<libtorrent::alert> alert = _session.pop_alert();
+ eventHandler.handle(std::move(alert));
+ }
+ }
+ } catch(const boost::thread_interrupted&) {
+ LOG(spam, "The FileDownloader thread was interrupted.");
+ } catch(...) {
+ _exceptionRethrower->store(boost::current_exception());
+ }
+}
+
+void
+FileDownloader::signalIfFinishedDownloading(const std::string& fileReference) {
+ boost::optional<fs::path> path = pathToCompletedFile(fileReference);
+
+ if (path) {
+ _downloadCompleted(fileReference, *path);
+ }
+}
+
+std::string
+FileDownloader::infoHash2FileReference(const libtorrent::sha1_hash& infoHash) {
+ //TODO
+ return fileReferenceToString(infoHash);
+}
+
+namespace {
+int
+toBytesPerSec(double MBPerSec) {
+ return static_cast<int>(MBPerSec * 1024 * 1024);
+}
+}
+
+void
+FileDownloader::setMaxDownloadSpeed(double MBPerSec) {
+ LOG(config, "Setting max download speed to %f MB/sec", MBPerSec);
+#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
+ _session.set_download_rate_limit(toBytesPerSec(MBPerSec));
+#pragma GCC diagnostic pop
+}
+
+void
+FileDownloader::setMaxUploadSpeed(double MBPerSec) {
+ LOG(config, "Setting max upload speed to %f MB/sec", MBPerSec);
+#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
+ _session.set_upload_rate_limit(toBytesPerSec(MBPerSec));
+#pragma GCC diagnostic pop
+}
diff --git a/filedistribution/src/vespa/filedistribution/distributor/filedownloader.h b/filedistribution/src/vespa/filedistribution/distributor/filedownloader.h
new file mode 100644
index 00000000000..467d1c8f2c8
--- /dev/null
+++ b/filedistribution/src/vespa/filedistribution/distributor/filedownloader.h
@@ -0,0 +1,94 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vector>
+#include <boost/thread/mutex.hpp>
+#include <boost/filesystem/path.hpp>
+#include <boost/optional.hpp>
+#include <boost/multi_index_container.hpp>
+#include <boost/multi_index/indexed_by.hpp>
+#include <boost/multi_index/member.hpp>
+#include <boost/multi_index/ordered_index.hpp>
+
+#include <libtorrent/session.hpp>
+
+#include <vespa/filedistribution/rpc/fileprovider.h>
+#include "hostname.h"
+#include <vespa/filedistribution/common/buffer.h>
+#include <vespa/filedistribution/common/exceptionrethrower.h>
+#include <vespa/filedistribution/common/exception.h>
+
+namespace filedistribution {
+
+struct NoSuchTorrentException : public Exception {};
+
+struct FailedListeningException : public Exception {
+ FailedListeningException(const std::string& hostName, int port) {
+ *this <<errorinfo::HostName(hostName) <<errorinfo::Port(port);
+ }
+};
+
+class FileDownloader
+{
+ struct EventHandler;
+ struct LogSessionDeconstructed {
+ ~LogSessionDeconstructed();
+ };
+
+ size_t _outstanding_SRD_requests;
+ boost::shared_ptr<FileDistributionTracker> _tracker;
+
+ boost::mutex _modifyTorrentsDownloadingMutex;
+ typedef boost::lock_guard<boost::mutex> LockGuard;
+
+ LogSessionDeconstructed _logSessionDeconstructed;
+ //session is safe to use from multiple threads.
+ libtorrent::session _session;
+
+ const boost::filesystem::path _dbPath;
+ typedef std::vector<char> ResumeDataBuffer;
+ boost::optional<ResumeDataBuffer> getResumeData(const std::string& fileReference);
+
+ class RemoveTorrent;
+
+ void deleteTorrentData(const libtorrent::torrent_handle& torrent, LockGuard&);
+ void listen();
+public:
+ // accounting of save-resume-data requests:
+ void didRequestSRD() { ++_outstanding_SRD_requests; }
+ void didReceiveSRD() { --_outstanding_SRD_requests; }
+
+ typedef FileProvider::DownloadCompletedSignal DownloadCompletedSignal;
+ typedef FileProvider::DownloadFailedSignal DownloadFailedSignal;
+
+ FileDownloader(const boost::shared_ptr<FileDistributionTracker>& tracker,
+ const std::string& hostName, int port,
+ const boost::filesystem::path& dbPath,
+ const boost::shared_ptr<ExceptionRethrower>& exceptionRethrower);
+ ~FileDownloader();
+
+ void runEventLoop();
+ void addTorrent(const std::string& fileReference, const Buffer& buffer);
+ bool hasTorrent(const std::string& fileReference) const;
+ boost::optional<boost::filesystem::path> pathToCompletedFile(const std::string& fileReference) const;
+ void removeAllTorrentsBut(const std::set<std::string> & filesToRetain);
+
+ void signalIfFinishedDownloading(const std::string& fileReference);
+
+ std::string infoHash2FileReference(const libtorrent::sha1_hash& hash);
+ void setMaxDownloadSpeed(double MBPerSec);
+ void setMaxUploadSpeed(double MBPerSec);
+
+ const boost::shared_ptr<ExceptionRethrower> _exceptionRethrower;
+
+ const std::string _hostName;
+ const int _port;
+
+ //signals
+ DownloadCompletedSignal _downloadCompleted;
+ DownloadFailedSignal _downloadFailed; //removed or error
+};
+
+} //namespace filedistribution
+
+
diff --git a/filedistribution/src/vespa/filedistribution/distributor/filedownloadermanager.cpp b/filedistribution/src/vespa/filedistribution/distributor/filedownloadermanager.cpp
new file mode 100644
index 00000000000..af66ed86afa
--- /dev/null
+++ b/filedistribution/src/vespa/filedistribution/distributor/filedownloadermanager.cpp
@@ -0,0 +1,148 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/fastos/fastos.h>
+#include <vespa/log/log.h>
+LOG_SETUP(".filedownloadermanager");
+
+#include "filedownloadermanager.h"
+
+#include <iterator>
+#include <sstream>
+#include <boost/lambda/lambda.hpp>
+#include <boost/lambda/bind.hpp>
+#include <boost/thread.hpp>
+
+using filedistribution::FileDownloaderManager;
+
+namespace lambda = boost::lambda;
+
+namespace {
+void logStartDownload(const std::set<std::string> & filesToDownload) {
+ std::ostringstream msg;
+ msg <<"StartDownloads:" <<std::endl;
+ std::copy(filesToDownload.begin(), filesToDownload.end(),
+ std::ostream_iterator<std::string>(msg, "\n"));
+ LOG(debug, msg.str().c_str());
+}
+} //anonymous namespace
+
+FileDownloaderManager::FileDownloaderManager(
+ const boost::shared_ptr<FileDownloader>& downloader,
+ const boost::shared_ptr<FileDistributionModel>& model)
+
+ :_fileDownloader(downloader),
+ _fileDistributionModel(model),
+ _startDownloads(this),
+ _setFinishedDownloadingStatus(this)
+{}
+
+FileDownloaderManager::~FileDownloaderManager() {
+ LOG(debug, "Deconstructing FileDownloaderManager");
+}
+
+void
+FileDownloaderManager::start()
+{
+ _downloadFailedConnection =
+ downloadFailed().connect(
+ DownloadFailedSignal::slot_type(lambda::bind(&FileDownloaderManager::removePeerStatus, this, lambda::_1)).
+ track(shared_from_this()));
+
+ _downloadCompletedConnection =
+ downloadCompleted().connect(
+ DownloadCompletedSignal::slot_type(_setFinishedDownloadingStatus).
+ track(shared_from_this()));
+
+ _filesToDownloadChangedConnection =
+ _fileDistributionModel->_filesToDownloadChanged.connect(
+ FileDistributionModel::FilesToDownloadChangedSignal::slot_type(boost::ref(_startDownloads)).
+ track(shared_from_this()));
+}
+
+boost::optional< boost::filesystem::path >
+FileDownloaderManager::getPath(const std::string& fileReference) {
+ return _fileDownloader->pathToCompletedFile(fileReference);
+}
+
+void
+FileDownloaderManager::downloadFile(const std::string& fileReference) {
+ {
+ LockGuard updateFilesToDownloadGuard(_updateFilesToDownloadMutex);
+ _startDownloads.downloadFile(fileReference);
+ }
+
+ //if the file is already downloading but not completed before the above call,
+ //the finished download callback might come before the interested party
+ //has called connectFinishedDownloadingHandler.
+ //An explicit call is therefore used to mitigate this problem:
+ //Do not hold updateFilesToDownloadMutex when calling this, as it might cause deadlock.
+ _fileDownloader->signalIfFinishedDownloading(fileReference);
+}
+
+void
+FileDownloaderManager::removePeerStatus(const std::string& fileReference) {
+ //TODO: Simplify by using separate thread for removal:
+ //currently called via StartDownloads which already holds a lock on updateFilesToDownloadMutex.
+
+ _fileDistributionModel->removePeer(fileReference);
+}
+
+void
+FileDownloaderManager::StartDownloads::downloadFile(const std::string& fileReference) {
+ if (!_parent._fileDownloader->hasTorrent(fileReference)) {
+ Buffer torrent(
+ _parent._fileDistributionModel->getFileDBModel().getFile(fileReference));
+
+ _parent._fileDistributionModel->addPeer(fileReference);
+ _parent._fileDownloader->addTorrent(fileReference, torrent);
+ }
+}
+
+
+void
+FileDownloaderManager::StartDownloads::operator()() {
+ namespace ll = boost::lambda;
+
+ LockGuard updateFilesToDownloadGuard(_parent._updateFilesToDownloadMutex);
+
+ std::set<std::string> filesToDownload =
+ _parent._fileDistributionModel->getFilesToDownload();
+ logStartDownload(filesToDownload);
+
+ std::for_each(filesToDownload.begin(), filesToDownload.end(),
+ ll::bind(&StartDownloads::downloadFile, this, ll::_1));
+
+ _parent._fileDownloader->removeAllTorrentsBut(filesToDownload);
+}
+
+FileDownloaderManager::StartDownloads::StartDownloads(FileDownloaderManager* parent)
+ :_parent(*parent)
+{}
+
+
+FileDownloaderManager::SetFinishedDownloadingStatus::SetFinishedDownloadingStatus(
+ FileDownloaderManager* parent)
+ :_parent(*parent)
+{}
+
+void
+FileDownloaderManager::SetFinishedDownloadingStatus::operator()(
+ const std::string& fileReference, const boost::filesystem::path&) {
+
+ //Prevent concurrent modifications to peer node in zk.
+ LockGuard updateFilesToDownloadGuard(_parent._updateFilesToDownloadMutex);
+
+ try {
+ _parent._fileDistributionModel->peerFinished(fileReference);
+ } catch(const FileDistributionModel::NotPeer&) { //Probably a concurrent removal of the torrent.
+
+ //improve chance of libtorrent session being updated.
+ boost::this_thread::sleep(boost::posix_time::milliseconds(100));
+ if (_parent._fileDownloader->hasTorrent(fileReference)) {
+
+ _parent._fileDistributionModel->addPeer(fileReference);
+ _parent._fileDistributionModel->peerFinished(fileReference);
+ } else {
+ LOG(debug, "OK: Torrent '%s' finished concurrently with its removal.", fileReference.c_str());
+ }
+ }
+}
diff --git a/filedistribution/src/vespa/filedistribution/distributor/filedownloadermanager.h b/filedistribution/src/vespa/filedistribution/distributor/filedownloadermanager.h
new file mode 100644
index 00000000000..edebe5a8423
--- /dev/null
+++ b/filedistribution/src/vespa/filedistribution/distributor/filedownloadermanager.h
@@ -0,0 +1,68 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <boost/noncopyable.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/signals2/signal.hpp>
+#include <boost/enable_shared_from_this.hpp>
+
+#include <vespa/filedistribution/rpc/fileprovider.h>
+#include <vespa/filedistribution/model/filedistributionmodel.h>
+#include "filedownloader.h"
+
+namespace filedistribution {
+
+class FileDownloaderManager : public FileProvider,
+ boost::noncopyable,
+ public boost::enable_shared_from_this<FileDownloaderManager> {
+
+ class StartDownloads {
+ FileDownloaderManager& _parent;
+ public:
+ void operator()();
+ void downloadFile(const std::string& fileReference);
+ StartDownloads(FileDownloaderManager* parent);
+ };
+
+ class SetFinishedDownloadingStatus {
+ FileDownloaderManager& _parent;
+ public:
+ void operator()(const std::string& fileReference, const boost::filesystem::path&);
+ SetFinishedDownloadingStatus(FileDownloaderManager*);
+ };
+
+ typedef boost::lock_guard<boost::mutex> LockGuard;
+ boost::mutex _updateFilesToDownloadMutex;
+
+ boost::shared_ptr<FileDownloader> _fileDownloader;
+ boost::shared_ptr<FileDistributionModel> _fileDistributionModel;
+ StartDownloads _startDownloads;
+ SetFinishedDownloadingStatus _setFinishedDownloadingStatus;
+
+ boost::signals2::scoped_connection _downloadFailedConnection;
+ boost::signals2::scoped_connection _downloadCompletedConnection;
+ boost::signals2::scoped_connection _filesToDownloadChangedConnection;
+
+ void removePeerStatus(const std::string& fileReference);
+public:
+ FileDownloaderManager(const boost::shared_ptr<FileDownloader>&,
+ const boost::shared_ptr<FileDistributionModel>& model);
+ ~FileDownloaderManager();
+ void start();
+
+ boost::optional<boost::filesystem::path> getPath(const std::string& fileReference);
+ void downloadFile(const std::string& fileReference);
+
+ //FileProvider overrides
+ DownloadCompletedSignal& downloadCompleted() {
+ return _fileDownloader->_downloadCompleted;
+ }
+
+ DownloadFailedSignal& downloadFailed() {
+ return _fileDownloader->_downloadFailed;
+ }
+};
+
+} //namespace filedistribution
+
+
diff --git a/filedistribution/src/vespa/filedistribution/distributor/hostname.cpp b/filedistribution/src/vespa/filedistribution/distributor/hostname.cpp
new file mode 100644
index 00000000000..acd5c982957
--- /dev/null
+++ b/filedistribution/src/vespa/filedistribution/distributor/hostname.cpp
@@ -0,0 +1,22 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "hostname.h"
+
+#include <boost/asio.hpp>
+
+#include <vespa/log/log.h>
+LOG_SETUP(".hostname");
+#include <vespa/vespalib/net/socket_address.h>
+
+namespace asio = boost::asio;
+
+std::string
+filedistribution::lookupIPAddress(const std::string& hostName)
+{
+ auto best_addr = vespalib::SocketAddress::select_remote(0, hostName.c_str());
+ if (!best_addr.valid()) {
+ BOOST_THROW_EXCEPTION(filedistribution::FailedResolvingHostName(hostName));
+ }
+ const std::string address = best_addr.ip_address();
+ LOG(debug, "Resolved hostname'%s' as '%s'", hostName.c_str(), address.c_str());
+ return address;
+}
diff --git a/filedistribution/src/vespa/filedistribution/distributor/hostname.h b/filedistribution/src/vespa/filedistribution/distributor/hostname.h
new file mode 100644
index 00000000000..12732029c92
--- /dev/null
+++ b/filedistribution/src/vespa/filedistribution/distributor/hostname.h
@@ -0,0 +1,22 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <string>
+#include <vespa/filedistribution/common/exception.h>
+
+namespace filedistribution {
+
+namespace errorinfo {
+typedef boost::error_info<struct tag_HostName, std::string> HostName;
+typedef boost::error_info<struct tag_Port, int> Port;
+};
+
+
+std::string lookupIPAddress(const std::string& hostName);
+
+struct FailedResolvingHostName : public Exception {
+ FailedResolvingHostName(const std::string& hostName) {
+ *this <<errorinfo::HostName(hostName);
+ }
+};
+}
diff --git a/filedistribution/src/vespa/filedistribution/distributor/scheduler.cpp b/filedistribution/src/vespa/filedistribution/distributor/scheduler.cpp
new file mode 100644
index 00000000000..1ae0b0b1f95
--- /dev/null
+++ b/filedistribution/src/vespa/filedistribution/distributor/scheduler.cpp
@@ -0,0 +1,49 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/fastos/fastos.h>
+#include "scheduler.h"
+
+#include <boost/bind.hpp>
+
+#include <iostream>
+
+namespace asio = boost::asio;
+
+using filedistribution::Scheduler;
+typedef Scheduler::Task Task;
+
+Task::Task(Scheduler& scheduler)
+ : _timer(scheduler.ioService)
+{}
+
+void
+Task::schedule(asio::deadline_timer::duration_type delay)
+{
+ _timer.expires_from_now(delay);
+ _timer.async_wait(boost::bind(&Task::handle, shared_from_this(), _1));
+}
+
+void
+Task::scheduleNow()
+{
+ schedule(boost::posix_time::seconds(0));
+}
+
+void
+Task::handle(const boost::system::error_code& code) {
+ if (code != asio::error::operation_aborted) {
+ doHandle();
+ }
+}
+
+
+Scheduler::Scheduler(boost::function<void (asio::io_service&)> callRun)
+ :_keepAliveWork(ioService),
+ _workerThread(boost::bind(callRun, boost::ref(ioService)))
+{}
+
+Scheduler::~Scheduler() {
+ ioService.stop();
+ _workerThread.interrupt();
+ _workerThread.join();
+ ioService.reset();
+}
diff --git a/filedistribution/src/vespa/filedistribution/distributor/scheduler.h b/filedistribution/src/vespa/filedistribution/distributor/scheduler.h
new file mode 100644
index 00000000000..75d4fa2d57b
--- /dev/null
+++ b/filedistribution/src/vespa/filedistribution/distributor/scheduler.h
@@ -0,0 +1,45 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <boost/asio/io_service.hpp>
+#include <boost/asio/deadline_timer.hpp>
+#include <boost/enable_shared_from_this.hpp>
+#include <boost/thread.hpp>
+
+
+namespace filedistribution {
+
+class Scheduler : boost::noncopyable {
+public:
+ class Task : public boost::enable_shared_from_this<Task> {
+ boost::asio::deadline_timer _timer;
+ public:
+ typedef boost::shared_ptr<Task> SP;
+
+ Task(Scheduler& scheduler);
+
+ virtual ~Task() {}
+
+ void schedule(boost::asio::deadline_timer::duration_type delay);
+ void scheduleNow();
+
+ void handle(const boost::system::error_code& code);
+ protected:
+ virtual void doHandle() = 0;
+ };
+
+private:
+ boost::asio::io_service ioService;
+
+ //keeps io_service.run() from exiting until it has been destructed,
+ //see http://www.boost.org/doc/libs/1_42_0/doc/html/boost_asio/reference/io_service.html
+ boost::asio::io_service::work _keepAliveWork;
+ boost::thread _workerThread;
+
+public:
+ Scheduler(boost::function<void (boost::asio::io_service&)> callRun) ;
+ ~Scheduler();
+};
+
+}
+
diff --git a/filedistribution/src/vespa/filedistribution/distributor/signalhandling.cpp b/filedistribution/src/vespa/filedistribution/distributor/signalhandling.cpp
new file mode 100644
index 00000000000..0465f8d7b29
--- /dev/null
+++ b/filedistribution/src/vespa/filedistribution/distributor/signalhandling.cpp
@@ -0,0 +1,40 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/fastos/fastos.h>
+#include "signalhandling.h"
+#include <vespa/vespalib/util/signalhandler.h>
+
+#include <vespa/log/log.h>
+LOG_SETUP(".signalhandling");
+
+typedef vespalib::SignalHandler SIG;
+
+void
+initSignals() {
+ SIG::PIPE.ignore();
+ SIG::INT.hook();
+ SIG::TERM.hook();
+ SIG::USR1.hook();
+}
+
+bool
+askedToShutDown() {
+ bool result = SIG::INT.check() || SIG::TERM.check();
+ if (result) {
+ LOG(debug, "Asked to shut down.");
+ }
+ return result;
+}
+
+bool
+askedToReinitialize() {
+ bool result = SIG::USR1.check();
+ if (result) {
+ LOG(debug, "Asked to reinitialize.");
+ }
+ return result;
+}
+
+void
+clearReinitializeFlag() {
+ SIG::USR1.clear();
+}
diff --git a/filedistribution/src/vespa/filedistribution/distributor/signalhandling.h b/filedistribution/src/vespa/filedistribution/distributor/signalhandling.h
new file mode 100644
index 00000000000..d140e101169
--- /dev/null
+++ b/filedistribution/src/vespa/filedistribution/distributor/signalhandling.h
@@ -0,0 +1,15 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+void
+initSignals();
+
+bool
+askedToShutDown();
+
+bool
+askedToReinitialize();
+
+void
+clearReinitializeFlag();
+
diff --git a/filedistribution/src/vespa/filedistribution/distributor/state_server_impl.cpp b/filedistribution/src/vespa/filedistribution/distributor/state_server_impl.cpp
new file mode 100644
index 00000000000..95a0a1a64ca
--- /dev/null
+++ b/filedistribution/src/vespa/filedistribution/distributor/state_server_impl.cpp
@@ -0,0 +1,8 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/fastos/fastos.h>
+#include "state_server_impl.h"
+
+namespace filedistribution {
+
+} // namespace filedistribution
diff --git a/filedistribution/src/vespa/filedistribution/distributor/state_server_impl.h b/filedistribution/src/vespa/filedistribution/distributor/state_server_impl.h
new file mode 100644
index 00000000000..f52da27e597
--- /dev/null
+++ b/filedistribution/src/vespa/filedistribution/distributor/state_server_impl.h
@@ -0,0 +1,21 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/vespalib/net/state_server.h>
+#include <vespa/vespalib/net/simple_metrics_producer.h>
+#include <vespa/vespalib/net/simple_health_producer.h>
+#include <vespa/vespalib/net/simple_component_config_producer.h>
+
+namespace filedistribution {
+
+struct StateServerImpl {
+ vespalib::SimpleHealthProducer myHealth;
+ vespalib::SimpleMetricsProducer myMetrics;
+ vespalib::SimpleComponentConfigProducer myComponents;
+ vespalib::StateServer myStateServer;
+
+ StateServerImpl(int port) : myStateServer(port, myHealth, myMetrics, myComponents) {}
+};
+
+} // namespace filedistribution
diff --git a/filedistribution/src/vespa/filedistribution/manager/.gitignore b/filedistribution/src/vespa/filedistribution/manager/.gitignore
new file mode 100644
index 00000000000..b94a4aae8e0
--- /dev/null
+++ b/filedistribution/src/vespa/filedistribution/manager/.gitignore
@@ -0,0 +1,5 @@
+.depend
+Makefile
+com_yahoo_vespa_filedistribution_*.h
+*.So
+/libfiledistributionmanager.so.5.1
diff --git a/filedistribution/src/vespa/filedistribution/manager/CMakeLists.txt b/filedistribution/src/vespa/filedistribution/manager/CMakeLists.txt
new file mode 100644
index 00000000000..0643983bb9b
--- /dev/null
+++ b/filedistribution/src/vespa/filedistribution/manager/CMakeLists.txt
@@ -0,0 +1,27 @@
+# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_library(filedistribution_filedistributionmanager
+ SOURCES
+ ${CMAKE_CURRENT_BINARY_DIR}/com_yahoo_vespa_filedistribution_FileDistributionManager.h
+ createtorrent.cpp
+ filedb.cpp
+ filedistributionmanager.cpp
+ stderr_logfwd.cpp
+ $<TARGET_OBJECTS:filedistribution_filedbmodel>
+ $<TARGET_OBJECTS:filedistribution_exceptionrethrower>
+ INSTALL lib64
+ OUTPUT_NAME filedistributionmanager
+ DEPENDS
+ boost_system-mt-d
+ boost_thread-mt-d
+ boost_filesystem-mt-d
+ zookeeper_mt
+ ${JAVA_JVM_LIBRARY}
+)
+target_include_directories(filedistribution_filedistributionmanager PUBLIC SYSTEM ${JNI_INCLUDE_DIRS})
+add_custom_command(
+ OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/com_yahoo_vespa_filedistribution_FileDistributionManager.h
+ COMMAND javah -classpath ${PROJECT_SOURCE_DIR}/filedistributionmanager/target/filedistributionmanager.jar com.yahoo.vespa.filedistribution.FileDistributionManager
+ MAIN_DEPENDENCY ${PROJECT_SOURCE_DIR}/filedistributionmanager/target/filedistributionmanager.jar
+ WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}
+)
+
diff --git a/filedistribution/src/vespa/filedistribution/manager/createtorrent.cpp b/filedistribution/src/vespa/filedistribution/manager/createtorrent.cpp
new file mode 100644
index 00000000000..001edd0e20a
--- /dev/null
+++ b/filedistribution/src/vespa/filedistribution/manager/createtorrent.cpp
@@ -0,0 +1,90 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/fastos/fastos.h>
+#include "createtorrent.h"
+
+#include <iostream>
+#include <fstream>
+#include <cmath>
+#include <iterator>
+#include <sstream>
+#include <string>
+
+#include <boost/filesystem/convenience.hpp>
+#include <boost/lambda/lambda.hpp>
+
+#include "libtorrent/torrent_info.hpp"
+
+namespace fs = boost::filesystem;
+
+namespace {
+
+const libtorrent::size_type targetTorrentSize = 64 * 1024;
+
+void
+aggregateFilenames(std::vector<std::string>& accumulator, const fs::path& path, std::string currPrefix)
+{
+ if (fs::is_directory(path)) {
+ for (fs::directory_iterator i(path), end;
+ i != end;
+ ++i) {
+ std::string newPrefix = currPrefix + "/" + std::string(i->path().filename().c_str());
+ accumulator.push_back(newPrefix);
+ aggregateFilenames(accumulator, *i, newPrefix);
+ }
+ }
+}
+
+
+libtorrent::entry
+createEntry(const fs::path& path) {
+ if (!fs::exists(path))
+ throw std::runtime_error("Path '" + std::string(path.filename().c_str()) + " does not exists");
+
+ libtorrent::file_storage fileStorage;
+ libtorrent::add_files(fileStorage, path.string());
+
+ libtorrent::create_torrent torrent(fileStorage);
+ torrent.set_creator("vespa-filedistributor");
+ torrent.set_priv(true);
+ torrent.add_tracker("");
+
+ libtorrent::set_piece_hashes(torrent, path.branch_path().string());
+ return torrent.generate();
+}
+
+std::string
+fileReferenceToString(const libtorrent::sha1_hash& fileReference) {
+ std::ostringstream fileReferenceString;
+ fileReferenceString <<fileReference;
+ return fileReferenceString.str();
+}
+
+} //anonymous namespace
+
+filedistribution::
+CreateTorrent::
+CreateTorrent(const boost::filesystem::path& path)
+ :_path(path),
+ _entry(createEntry(_path))
+{}
+
+const filedistribution::Move<filedistribution::Buffer>
+filedistribution::
+CreateTorrent::
+bencode() const
+{
+ Buffer buffer(static_cast<int>(targetTorrentSize));
+ libtorrent::bencode(std::back_inserter(buffer), _entry);
+ return move(buffer);
+}
+
+const std::string
+filedistribution::
+CreateTorrent::
+fileReference() const
+{
+#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
+ libtorrent::torrent_info info(_entry);
+#pragma GCC diagnostic pop
+ return fileReferenceToString(info.info_hash());
+}
diff --git a/filedistribution/src/vespa/filedistribution/manager/createtorrent.h b/filedistribution/src/vespa/filedistribution/manager/createtorrent.h
new file mode 100644
index 00000000000..93d56fa9e6f
--- /dev/null
+++ b/filedistribution/src/vespa/filedistribution/manager/createtorrent.h
@@ -0,0 +1,23 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vector>
+#include <boost/filesystem/path.hpp>
+#include <libtorrent/create_torrent.hpp>
+
+#include <vespa/filedistribution/common/buffer.h>
+
+namespace filedistribution {
+
+class CreateTorrent {
+ boost::filesystem::path _path;
+ libtorrent::entry _entry;
+public:
+
+ CreateTorrent(const boost::filesystem::path& path);
+ const Move<Buffer> bencode() const;
+ const std::string fileReference() const;
+};
+
+} //namespace filedistribution
+
diff --git a/filedistribution/src/vespa/filedistribution/manager/field.h b/filedistribution/src/vespa/filedistribution/manager/field.h
new file mode 100644
index 00000000000..6077ad7a341
--- /dev/null
+++ b/filedistribution/src/vespa/filedistribution/manager/field.h
@@ -0,0 +1,46 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <jni.h>
+
+#include <vespa/filedistribution/common/exception.h>
+
+namespace filedistribution {
+
+struct BadFieldException : std::runtime_error {
+ explicit BadFieldException(const std::string& name)
+ : runtime_error("Could not lookup field '" + name + "'")
+ {}
+
+ BadFieldException(const BadFieldException& e)
+ : runtime_error(e)
+ {}
+};
+
+template <class T>
+class LongField {
+ jfieldID _fieldID;
+public:
+ LongField()
+ {}
+
+ LongField(jclass clazz, const char* fieldName, JNIEnv* env)
+ :_fieldID(env->GetFieldID(clazz, fieldName, "J")) {
+
+ if (!_fieldID)
+ BOOST_THROW_EXCEPTION(BadFieldException(fieldName));
+ }
+
+ void set(jobject obj, T value, JNIEnv* env) {
+ jlong longValue = reinterpret_cast<long>(value);
+ env->SetLongField(obj, _fieldID, longValue);
+ }
+
+ T get(jobject obj, JNIEnv* env) {
+ jlong longValue = env->GetLongField(obj, _fieldID);
+ return reinterpret_cast<T>(longValue);
+ }
+};
+
+} //namespace filedistribution
+
diff --git a/filedistribution/src/vespa/filedistribution/manager/filedb.cpp b/filedistribution/src/vespa/filedistribution/manager/filedb.cpp
new file mode 100644
index 00000000000..9b0a665342a
--- /dev/null
+++ b/filedistribution/src/vespa/filedistribution/manager/filedb.cpp
@@ -0,0 +1,57 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/fastos/fastos.h>
+#include "filedb.h"
+
+#include <boost/filesystem.hpp>
+
+namespace fs = boost::filesystem;
+
+namespace {
+
+void copyDirectory(fs::path original, fs::path destination)
+{
+ fs::create_directory(destination);
+ for (fs::directory_iterator curr(original), end;
+ curr != end;
+ ++curr) {
+
+ fs::path destPath = destination / curr->path().filename();
+ if ( fs::is_directory(curr->status()) ) {
+ copyDirectory(*curr, destPath);
+ } else {
+ fs::copy_file(*curr, destPath);
+ }
+ }
+}
+
+} //anonymous namespace
+
+filedistribution::
+FileDB::
+FileDB(fs::path dbPath)
+ :_dbPath(dbPath)
+{}
+
+
+void
+filedistribution::
+FileDB::
+add(fs::path original, const std::string& name)
+{
+ fs::path targetPathTemp = _dbPath / (name + ".tmp");
+ if ( fs::exists(targetPathTemp) )
+ fs::remove_all(targetPathTemp);
+
+
+ fs::create_directory(targetPathTemp);
+ if ( !fs::is_directory(original) ) {
+ fs::copy_file(original, targetPathTemp / original.filename());
+ } else {
+ copyDirectory(original, targetPathTemp / original.filename());
+ }
+
+ fs::path targetPath = _dbPath / (name + ".new");
+ if ( fs::exists(targetPath) )
+ fs::remove_all(targetPath);
+ fs::rename(targetPathTemp, targetPath);
+}
diff --git a/filedistribution/src/vespa/filedistribution/manager/filedb.h b/filedistribution/src/vespa/filedistribution/manager/filedb.h
new file mode 100644
index 00000000000..d44436f9e7f
--- /dev/null
+++ b/filedistribution/src/vespa/filedistribution/manager/filedb.h
@@ -0,0 +1,18 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <string>
+#include <boost/filesystem/path.hpp>
+
+namespace filedistribution {
+
+class FileDB {
+ boost::filesystem::path _dbPath;
+ int _fd;
+public:
+ FileDB(boost::filesystem::path dbPath);
+ void add(boost::filesystem::path original, const std::string& name);
+};
+
+} //namespace filedistribution
+
diff --git a/filedistribution/src/vespa/filedistribution/manager/filedistributionmanager.cpp b/filedistribution/src/vespa/filedistribution/manager/filedistributionmanager.cpp
new file mode 100644
index 00000000000..fde4302eee8
--- /dev/null
+++ b/filedistribution/src/vespa/filedistribution/manager/filedistributionmanager.cpp
@@ -0,0 +1,236 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/fastos/fastos.h>
+#include <vespa/filedistribution/manager/com_yahoo_vespa_filedistribution_FileDistributionManager.h>
+
+#include <memory>
+#include <boost/lambda/lambda.hpp>
+
+#include <vespa/filedistribution/model/filedistributionmodel.h>
+#include <vespa/filedistribution/model/zkfiledbmodel.h>
+#include <vespa/filedistribution/model/mockfiledistributionmodel.h>
+#include <vespa/filedistribution/model/zkfacade.h>
+#include "jnistring.h"
+#include "field.h"
+#include "createtorrent.h"
+#include "filedb.h"
+
+using namespace filedistribution;
+
+namespace fs = boost::filesystem;
+
+namespace {
+
+class NativeFileDistributionManager {
+ public:
+ std::unique_ptr<FileDBModel> _fileDBModel;
+ std::unique_ptr<FileDB> _fileDB;
+};
+
+LongField<NativeFileDistributionManager*> nativeFileDistributionManagerField;
+
+void throwRuntimeException(const char* msg, JNIEnv* env)
+{
+ //do not mask active exception.
+ if (!env->ExceptionOccurred()) {
+ jclass runtimeExceptionClass = env->FindClass("java/lang/RuntimeException");
+ if (runtimeExceptionClass) {
+ env->ThrowNew(runtimeExceptionClass, msg);
+ }
+ }
+}
+
+template <class FIELD>
+void deleteField(FIELD& field, jobject self, JNIEnv* env)
+{
+ delete field.get(self, env);
+ field.set(self, 0, env);
+}
+
+} //anonymous namespace
+
+
+#define STANDARDCATCH(returnStatement) \
+ catch (const std::bad_alloc&) { \
+ std::cerr<<"Error: Out of memory" <<std::endl; \
+ /*might fail, therefore also uses stderror message*/ \
+ throwRuntimeException("Out of memory", env); \
+ returnStatement; \
+ } catch(const filedistribution::ZKException& e) { \
+ std::stringstream ss; \
+ ss << "In" << __FUNCTION__ << ": "; \
+ ss << diagnosticUserLevelMessage(e); \
+ throwRuntimeException(ss.str().c_str(), env); \
+ returnStatement; \
+ } catch(const std::exception& e) { \
+ throwRuntimeException(e.what(), env); \
+ returnStatement; \
+ }
+
+JNIEXPORT
+void JNICALL
+Java_com_yahoo_vespa_filedistribution_FileDistributionManager_setup(
+ JNIEnv *env, jclass self)
+{
+ try {
+ filedistribution::setupZooKeeperLogging();
+ nativeFileDistributionManagerField = LongField<NativeFileDistributionManager*>(self, "nativeFileDistributionManager", env);
+ } STANDARDCATCH()
+}
+
+
+namespace {
+void initMockFileDBModel(NativeFileDistributionManager& manager)
+{
+ manager._fileDBModel.reset(new MockFileDBModel());
+}
+
+void initFileDBModel(NativeFileDistributionManager& manager, const std::string& zkServers)
+{
+ //Ignored for now, since we're not installing any watchers.
+ boost::shared_ptr<ExceptionRethrower> ignoredRethrower(new ExceptionRethrower());
+ boost::shared_ptr<ZKFacade> zk(new ZKFacade(zkServers, ignoredRethrower));
+ manager._fileDBModel.reset(new ZKFileDBModel(zk));
+}
+} //end anonymous namespace
+
+JNIEXPORT
+void JNICALL
+Java_com_yahoo_vespa_filedistribution_FileDistributionManager_init(
+ JNIEnv *env, jobject self,
+ jbyteArray fileDBPathArg, jbyteArray zkServersArg)
+{
+ try {
+ JNIString zkServers(zkServersArg, env);
+
+ nativeFileDistributionManagerField.set(self, new NativeFileDistributionManager(), env);
+ NativeFileDistributionManager& manager = *nativeFileDistributionManagerField.get(self, env);
+ if (zkServers._value == "mockfiledistributionmodel.testing") {
+ initMockFileDBModel(manager);
+ } else {
+ initFileDBModel(manager, zkServers._value);
+ }
+
+ JNIString fileDBPath(fileDBPathArg, env);
+ manager._fileDB.reset(new FileDB(fileDBPath._value));
+
+ } STANDARDCATCH()
+}
+
+
+JNIEXPORT
+jstring JNICALL
+Java_com_yahoo_vespa_filedistribution_FileDistributionManager_addFileImpl(
+ JNIEnv *env, jobject self,
+ jbyteArray completePathArg)
+{
+ try {
+ JNIString completePath(completePathArg, env);
+ CreateTorrent createTorrent(completePath._value);
+
+ std::string fileReference = createTorrent.fileReference();
+ NativeFileDistributionManager& manager = *nativeFileDistributionManagerField.get(self, env);
+
+ manager._fileDB->add(completePath._value, fileReference);
+
+ FileDBModel& model = *manager._fileDBModel;
+ if (! model.hasFile(fileReference) ) {
+ model.addFile(fileReference, createTorrent.bencode());
+ }
+
+ //contains string with the characters 0-9 a-f
+ return env->NewStringUTF(fileReference.c_str());
+ } STANDARDCATCH(return 0)
+}
+
+
+JNIEXPORT
+void JNICALL
+Java_com_yahoo_vespa_filedistribution_FileDistributionManager_shutdown(
+ JNIEnv *env, jobject self)
+{
+ deleteField(nativeFileDistributionManagerField, self, env);
+}
+
+
+JNIEXPORT
+void JNICALL
+Java_com_yahoo_vespa_filedistribution_FileDistributionManager_setDeployedFilesImpl(
+ JNIEnv *env, jobject self, jbyteArray hostNameArg,
+ jbyteArray appIdArg, jobjectArray fileReferencesArg)
+{
+ try {
+ JNIString hostName(hostNameArg, env);
+ JNIString appId(appIdArg, env);
+ JNIArray<JNIString> fileReferences(fileReferencesArg, env);
+
+ nativeFileDistributionManagerField.get(self, env)->_fileDBModel->
+ setDeployedFilesToDownload(hostName._value, appId._value, fileReferences._value);
+ } STANDARDCATCH()
+}
+
+
+JNIEXPORT
+void JNICALL
+Java_com_yahoo_vespa_filedistribution_FileDistributionManager_limitSendingOfDeployedFilesToImpl(
+ JNIEnv *env, jobject self, jobjectArray hostNamesArg, jbyteArray appIdArg)
+{
+ try {
+ JNIArray<JNIString> hostNames(hostNamesArg, env);
+ JNIString appId(appIdArg, env);
+
+ nativeFileDistributionManagerField.get(self, env)->_fileDBModel->
+ cleanDeployedFilesToDownload(hostNames._value, appId._value);
+ } STANDARDCATCH()
+}
+
+JNIEXPORT
+void JNICALL
+Java_com_yahoo_vespa_filedistribution_FileDistributionManager_removeDeploymentsThatHaveDifferentApplicationIdImpl(
+ JNIEnv *env, jobject self, jobjectArray hostNamesArg, jbyteArray appIdArg)
+{
+ try {
+ JNIArray<JNIString> hostNames(hostNamesArg, env);
+ JNIString appId(appIdArg, env);
+
+ nativeFileDistributionManagerField.get(self, env)->_fileDBModel->
+ removeDeploymentsThatHaveDifferentApplicationId(hostNames._value, appId._value);
+ } STANDARDCATCH()
+}
+
+
+
+JNIEXPORT
+void JNICALL
+Java_com_yahoo_vespa_filedistribution_FileDistributionManager_limitFilesTo(
+ JNIEnv *env, jobject self, jobjectArray fileReferencesArg)
+{
+ try {
+ JNIArray<JNIString> fileReferences(fileReferencesArg, env);
+
+ nativeFileDistributionManagerField.get(self, env)->_fileDBModel->
+ cleanFiles(fileReferences._value);
+ } STANDARDCATCH()
+}
+
+
+JNIEXPORT
+jbyteArray JNICALL
+Java_com_yahoo_vespa_filedistribution_FileDistributionManager_getProgressImpl(
+ JNIEnv *env, jobject self, jbyteArray fileReferenceArg, jobjectArray hostNamesArg)
+{
+ try {
+ JNIString fileReference(fileReferenceArg, env);
+ JNIArray<JNIString> hostNames(hostNamesArg, env);
+
+ const filedistribution::FileDBModel::Progress progress =
+ nativeFileDistributionManagerField.get(self, env)->_fileDBModel->
+ getProgress(fileReference._value, hostNames._value);
+
+ jbyteArray result = env->NewByteArray(progress.size());
+ if (!result)
+ return 0; //exception thrown when returning
+
+ env->SetByteArrayRegion(result, 0, progress.size(), &*progress.begin());
+ return result;
+ } STANDARDCATCH(return 0)
+}
diff --git a/filedistribution/src/vespa/filedistribution/manager/jnistring.h b/filedistribution/src/vespa/filedistribution/manager/jnistring.h
new file mode 100644
index 00000000000..aa5d459232e
--- /dev/null
+++ b/filedistribution/src/vespa/filedistribution/manager/jnistring.h
@@ -0,0 +1,78 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <exception>
+#include <string>
+
+#include <jni.h>
+
+namespace filedistribution {
+
+class JNIString {
+ struct Repr {
+ JNIEnv* _env;
+ jbyteArray _str;
+ jint _size;
+ char* _repr;
+
+ Repr(jbyteArray str, JNIEnv* env)
+ :_env(env),
+ _str(str),
+ _size(_env->GetArrayLength(str)),
+ _repr(reinterpret_cast<char*>(_env->GetPrimitiveArrayCritical(str, 0))) {
+
+ if (!_repr)
+ throw std::bad_alloc();
+ }
+
+ ~Repr() {
+ _env->ReleasePrimitiveArrayCritical(_str, _repr, 0);
+ _env->DeleteLocalRef(_str);
+ }
+ };
+public:
+ typedef jbyteArray JavaValue;
+ typedef std::string Value;
+ Value _value;
+
+ JNIString(jbyteArray str, JNIEnv* env) {
+ Repr repr(str, env);
+ Value result(repr._repr, repr._repr + repr._size);
+ _value.swap(result);
+ }
+};
+
+class JNIUtf8String {
+public:
+ typedef jstring JavaValue;
+ typedef std::string Value;
+ std::string _value;
+
+ JNIUtf8String(jstring str, JNIEnv* env)
+ :_value(env->GetStringUTFLength(str), 0)
+ {
+ env->GetStringUTFRegion(str, 0, _value.begin() - _value.end(), &*_value.begin());
+ env->DeleteLocalRef(str);
+ }
+};
+
+template <class JNITYPE>
+class JNIArray {
+public:
+ std::vector<typename JNITYPE::Value> _value;
+
+ JNIArray(jobjectArray array, JNIEnv* env) {
+ jsize length = env->GetArrayLength(array);
+ _value.reserve(length);
+
+ for (jsize i=0; i<length; ++i) {
+ jobject element = env->GetObjectArrayElement(array, i);
+ JNITYPE elementValue(static_cast<typename JNITYPE::JavaValue>(element), env);
+ _value.push_back(elementValue._value);
+ }
+ env->DeleteLocalRef(array);
+ }
+};
+
+} //namespace filedistribution
+
diff --git a/filedistribution/src/vespa/filedistribution/manager/stderr_logfwd.cpp b/filedistribution/src/vespa/filedistribution/manager/stderr_logfwd.cpp
new file mode 100644
index 00000000000..686dffa5c00
--- /dev/null
+++ b/filedistribution/src/vespa/filedistribution/manager/stderr_logfwd.cpp
@@ -0,0 +1,26 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/fastos/fastos.h>
+#include <vespa/filedistribution/common/logfwd.h>
+
+#include <stdarg.h>
+#include <iostream>
+#include <boost/scoped_array.hpp>
+#include <stdio.h>
+
+
+
+void filedistribution::logfwd::log(LogLevel level, const char* file, int line, const char* fmt, ...)
+{
+ if (level == debug || level == info)
+ return;
+
+ const size_t maxSize(0x8000);
+ boost::scoped_array<char> payload(new char[maxSize]);
+
+ va_list args;
+ va_start(args, fmt);
+ vsnprintf(payload.get(), maxSize, fmt, args);
+ va_end(args);
+
+ std::cerr <<"Error: " <<payload.get() <<" File: " <<file <<" Line: " <<line <<std::endl;
+}
diff --git a/filedistribution/src/vespa/filedistribution/model/CMakeLists.txt b/filedistribution/src/vespa/filedistribution/model/CMakeLists.txt
new file mode 100644
index 00000000000..58031d1a7f4
--- /dev/null
+++ b/filedistribution/src/vespa/filedistribution/model/CMakeLists.txt
@@ -0,0 +1,20 @@
+# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_library(filedistribution_filedbmodel OBJECT
+ SOURCES
+ zkfiledbmodel.cpp
+ zkfacade.cpp
+ deployedfilestodownload.cpp
+ DEPENDS
+)
+vespa_add_library(filedistribution_filedistributionmodel STATIC
+ SOURCES
+ deployedfilestodownload.cpp
+ filedistributionmodelimpl.cpp
+ zkfacade.cpp
+ zkfiledbmodel.cpp
+ DEPENDS
+)
+vespa_generate_config(filedistribution_filedistributionmodel filereferences.def)
+
+vespa_add_target_external_dependency(filedistribution_filedistributionmodel zookeeper_mt)
+install(FILES filereferences.def DESTINATION var/db/vespa/config_server/serverdb/classes)
diff --git a/filedistribution/src/vespa/filedistribution/model/deployedfilestodownload.cpp b/filedistribution/src/vespa/filedistribution/model/deployedfilestodownload.cpp
new file mode 100644
index 00000000000..733d60d91bb
--- /dev/null
+++ b/filedistribution/src/vespa/filedistribution/model/deployedfilestodownload.cpp
@@ -0,0 +1,158 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/fastos/fastos.h>
+#include "deployedfilestodownload.h"
+
+#include <sstream>
+#include <iterator>
+
+#include <boost/lambda/lambda.hpp>
+#include <boost/lambda/bind.hpp>
+
+#include <vespa/filedistribution/common/logfwd.h>
+
+using filedistribution::DeployedFilesToDownload;
+
+typedef std::vector<std::string> StringVector;
+typedef boost::filesystem::path Path;
+
+namespace filedistribution {
+
+const Path getApplicationIdPath(const Path & parent) { return parent / "appId"; }
+
+const std::string
+readApplicationId(filedistribution::ZKFacade & zk, const Path & deployNode)
+{
+ if (zk.hasNode(getApplicationIdPath(deployNode))) {
+ return zk.getString(getApplicationIdPath(deployNode));
+ }
+ return "default:default:default";
+}
+
+}
+
+const DeployedFilesToDownload::Path
+DeployedFilesToDownload::addNewDeployNode(Path parentPath, const FileReferences& files) {
+ Path path = parentPath / "deploy_";
+
+ std::ostringstream filesStream;
+ if (!files.empty()) {
+ filesStream << files[0];
+ std::for_each(files.begin() +1, files.end(),
+ filesStream <<boost::lambda::constant('\n') <<boost::lambda::_1);
+ }
+ Path retPath = _zk.createSequenceNode(path, filesStream.str().c_str(), filesStream.str().length());
+ return retPath;
+}
+
+void
+DeployedFilesToDownload::deleteExpiredDeployNodes(Path parentPath) {
+ std::map<std::string, StringVector> childrenPerId(groupChildrenByAppId(parentPath, _zk.getChildren(parentPath)));
+ for (auto & kv : childrenPerId) {
+ deleteExpiredDeployNodes(parentPath, kv.second);
+ }
+}
+
+std::map<std::string, StringVector>
+DeployedFilesToDownload::groupChildrenByAppId(const Path & parentPath, const StringVector & children)
+{
+ std::map<std::string, StringVector> childrenById;
+ std::for_each(std::begin(children), std::end(children),
+ [&](const std::string & childName)
+ {
+ Path childPath = parentPath / childName;
+ std::string appId(readApplicationId(_zk, childPath));
+ childrenById[appId].push_back(childName);
+ });
+ return childrenById;
+}
+
+void
+DeployedFilesToDownload::deleteExpiredDeployNodes(Path parentPath, StringVector children)
+{
+ if (children.size() > numberOfDeploysToKeepFiles) {
+ std::sort(children.begin(), children.end());
+
+ size_t numberOfNodesToDelete = children.size() - numberOfDeploysToKeepFiles;
+ std::for_each(children.begin(), children.begin() + numberOfNodesToDelete,
+ boost::lambda::bind(&ZKFacade::remove, &_zk,
+ boost::lambda::ret<Path>(parentPath / boost::lambda::_1)));
+ }
+}
+
+void
+DeployedFilesToDownload::addAppIdToDeployNode(const Path & deployNode, const std::string & appId)
+{
+ _zk.setData(getApplicationIdPath(deployNode), appId.c_str(), appId.length());
+}
+
+void
+DeployedFilesToDownload::setDeployedFilesToDownload(
+ const std::string& hostName,
+ const std::string& applicationId,
+ const FileReferences& files) {
+ Path parentPath = getPath(hostName);
+ _zk.setData(parentPath, "", 0);
+
+ const Path deployNode(addNewDeployNode(parentPath, files));
+ addAppIdToDeployNode(deployNode, applicationId);
+ deleteExpiredDeployNodes(parentPath);
+}
+
+//Nothrow
+template <typename INSERT_ITERATOR>
+void
+DeployedFilesToDownload::readDeployFile(const Path& path, INSERT_ITERATOR insertionIterator) {
+ LOGFWD(debug, "Reading deploy file '%s", path.string().c_str());
+
+
+ try {
+ Buffer buffer(_zk.getData(path));
+ std::string stringBuffer(buffer.begin(), buffer.end());
+ std::istringstream stream(stringBuffer);
+
+ typedef std::istream_iterator<std::string> iterator;
+ std::copy(iterator(stream), iterator(), insertionIterator);
+ } catch (const ZKNodeDoesNotExistsException& e) {
+ //Node deleted, no problem.
+ LOGFWD(debug, "Deploy file '%s' deleted.", path.string().c_str());
+ }
+}
+
+const DeployedFilesToDownload::FileReferences
+DeployedFilesToDownload::getDeployedFilesToDownload(
+ const std::string& hostName,
+ const ZKFacade::NodeChangedWatcherSP& watcher) {
+
+ try {
+ StringVector deployedFiles = _zk.getChildren(getPath(hostName), watcher);
+ FileReferences fileReferences;
+
+ for (StringVector::iterator i = deployedFiles.begin(); i != deployedFiles.end(); ++i) {
+ readDeployFile(getPath(hostName) / *i, std::back_inserter(fileReferences));
+ }
+
+ return fileReferences;
+ } catch (const ZKNodeDoesNotExistsException&) {
+ //Add watch waiting for the node to appear:
+ if (_zk.hasNode(getPath(hostName), watcher)) {
+ return getDeployedFilesToDownload(hostName, watcher);
+ } else {
+ return FileReferences();
+ }
+ }
+}
+
+const DeployedFilesToDownload::FileReferences
+DeployedFilesToDownload::getLatestDeployedFilesToDownload(const std::string& hostName)
+{
+ StringVector deployedFiles = _zk.getChildren(getPath(hostName));
+ std::sort(deployedFiles.begin(), deployedFiles.end());
+
+ FileReferences fileReferences;
+ if (deployedFiles.empty()) {
+ return fileReferences;
+ } else {
+ readDeployFile(getPath(hostName) / *deployedFiles.rbegin(), std::back_inserter(fileReferences));
+ return fileReferences;
+ }
+}
diff --git a/filedistribution/src/vespa/filedistribution/model/deployedfilestodownload.h b/filedistribution/src/vespa/filedistribution/model/deployedfilestodownload.h
new file mode 100644
index 00000000000..aeed0922fc8
--- /dev/null
+++ b/filedistribution/src/vespa/filedistribution/model/deployedfilestodownload.h
@@ -0,0 +1,56 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <boost/filesystem/path.hpp>
+#include "zkfacade.h"
+#include "zkfiledbmodel.h"
+
+namespace filedistribution {
+
+const std::string readApplicationId(ZKFacade & zk, const boost::filesystem::path & deployNode);
+
+class DeployedFilesToDownload {
+ //includes the current deploy run;
+ static const size_t numberOfDeploysToKeepFiles = 2;
+ typedef boost::filesystem::path Path;
+
+ ZKFacade& _zk;
+
+ Path getPath(const std::string& hostName) {
+ return ZKFileDBModel::_hostsPath / hostName;
+ }
+
+ //Nothrow
+ template <typename INSERT_ITERATOR>
+ void readDeployFile(const boost::filesystem::path& path, INSERT_ITERATOR insertionIterator);
+ void addAppIdToDeployNode(const Path & deployNode, const std::string & appId);
+ std::map<std::string, std::vector<std::string> > groupChildrenByAppId(const Path & parentPath, const std::vector<std::string> & children);
+ void deleteExpiredDeployNodes(Path parentPath, std::vector<std::string> children);
+
+public:
+ typedef std::vector<std::string> FileReferences;
+
+ DeployedFilesToDownload(ZKFacade* zk)
+ :_zk(*zk)
+ {}
+
+ const Path addNewDeployNode(Path parentPath, const FileReferences& files);
+
+ void deleteExpiredDeployNodes(Path parentPath);
+
+ void setDeployedFilesToDownload(
+ const std::string& hostName,
+ const std::string& applicationId,
+ const FileReferences& files);
+
+ /** For all the deploys available **/
+ const FileReferences getDeployedFilesToDownload(
+ const std::string& hostName,
+ const ZKFacade::NodeChangedWatcherSP& watcher);
+
+ /** For the current deploy only **/
+ const FileReferences getLatestDeployedFilesToDownload(const std::string& hostName);
+};
+
+} //namespace filedistribution
+
diff --git a/filedistribution/src/vespa/filedistribution/model/filedbmodel.h b/filedistribution/src/vespa/filedistribution/model/filedbmodel.h
new file mode 100644
index 00000000000..1c8ad181306
--- /dev/null
+++ b/filedistribution/src/vespa/filedistribution/model/filedbmodel.h
@@ -0,0 +1,56 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <string>
+#include <vector>
+#include <vespa/filedistribution/common/buffer.h>
+#include <vespa/filedistribution/common/exception.h>
+
+namespace filedistribution {
+
+struct InvalidProgressException : public Exception {
+ const char* what() const throw() {
+ return "Invalid progress information reported by one of the filedistributors";
+ }
+};
+
+struct FileDoesNotExistException : public Exception {};
+
+class FileDBModel : boost::noncopyable {
+public:
+ class InvalidHostStatusException : public Exception {};
+ struct HostStatus {
+ enum State { finished, inProgress, notStarted };
+
+ State _state;
+ size_t _numFilesToDownload;
+ size_t _numFilesFinished;
+ };
+
+ virtual ~FileDBModel();
+
+ virtual bool hasFile(const std::string& fileReference) = 0;
+ virtual void addFile(const std::string& fileReference, const Buffer& buffer) = 0;
+ virtual Move<Buffer> getFile(const std::string& fileReference) = 0;
+ virtual void cleanFiles(const std::vector<std::string>& filesToPreserve) = 0;
+
+ virtual void setDeployedFilesToDownload(const std::string& hostName,
+ const std::string & appId,
+ const std::vector<std::string> & files) = 0;
+ virtual void cleanDeployedFilesToDownload(
+ const std::vector<std::string> & hostsToPreserve,
+ const std::string& appId) = 0;
+ virtual void removeDeploymentsThatHaveDifferentApplicationId(
+ const std::vector<std::string> & hostsToPreserve,
+ const std::string& appId) = 0;
+ virtual std::vector<std::string> getHosts() = 0;
+
+ virtual HostStatus getHostStatus(const std::string& hostName) = 0;
+ //TODO: does not really belong here, refactor.
+ typedef std::vector<int8_t> Progress; // [0-100]
+ virtual Progress getProgress(const std::string& fileReference,
+ const std::vector<std::string>& hostsSortedAscending) = 0;
+};
+
+} //namespace filedistribution
+
diff --git a/filedistribution/src/vespa/filedistribution/model/filedistributionmodel.h b/filedistribution/src/vespa/filedistribution/model/filedistributionmodel.h
new file mode 100644
index 00000000000..516e183490e
--- /dev/null
+++ b/filedistribution/src/vespa/filedistribution/model/filedistributionmodel.h
@@ -0,0 +1,43 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vector>
+#include <memory>
+#include <string>
+#include <set>
+
+#include <boost/noncopyable.hpp>
+#include <boost/filesystem/path.hpp>
+#include <boost/signals2.hpp>
+
+#include <libtorrent/peer.hpp>
+#include <vespa/filedistribution/common/buffer.h>
+#include <vespa/filedistribution/common/exception.h>
+#include "filedbmodel.h"
+
+namespace filedistribution {
+
+class FileDistributionModel : boost::noncopyable {
+public:
+ class NotPeer : public Exception {};
+
+ typedef boost::signals2::signal<void ()> FilesToDownloadChangedSignal;
+ typedef std::vector<libtorrent::peer_entry> PeerEntries;
+
+ virtual FileDBModel& getFileDBModel() = 0;
+
+ virtual std::set<std::string> getFilesToDownload() = 0;
+
+ virtual PeerEntries getPeers(const std::string& fileReference, size_t maxPeers) = 0;
+ virtual void addPeer(const std::string& fileReference) = 0;
+ virtual void removePeer(const std::string& fileReference) = 0;
+ virtual void peerFinished(const std::string& fileReference) = 0; //throws NotPeer
+
+ virtual ~FileDistributionModel() {}
+
+ FilesToDownloadChangedSignal _filesToDownloadChanged;
+};
+
+} //namespace filedistribution
+
+
diff --git a/filedistribution/src/vespa/filedistribution/model/filedistributionmodelimpl.cpp b/filedistribution/src/vespa/filedistribution/model/filedistributionmodelimpl.cpp
new file mode 100644
index 00000000000..5b9de93249a
--- /dev/null
+++ b/filedistribution/src/vespa/filedistribution/model/filedistributionmodelimpl.cpp
@@ -0,0 +1,239 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/fastos/fastos.h>
+#include "filedistributionmodel.h"
+
+#include <vector>
+#include <set>
+#include <string>
+#include <cstdlib>
+
+#include <boost/filesystem.hpp>
+#include <boost/lambda/lambda.hpp>
+#include <boost/lambda/bind.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/thread/locks.hpp>
+#include <zookeeper/zookeeper.h>
+
+#include <vespa/log/log.h>
+LOG_SETUP(".filedistributionmodel");
+
+#include "zkfiledbmodel.h"
+#include "deployedfilestodownload.h"
+#include "filedistributionmodelimpl.h"
+
+namespace fs = boost::filesystem;
+
+using filedistribution::ZKFileDBModel;
+
+namespace {
+//peer format: hostName:port
+
+
+void
+addPeerEntry(const std::string& peer,
+ filedistribution::FileDistributionModelImpl::PeerEntries& result) {
+
+ try {
+ libtorrent::peer_entry peerEntry;
+ peerEntry.pid.clear();
+
+ std::istringstream stream(peer);
+ stream.exceptions ( std::istream::failbit | std::istream::badbit );
+
+ std::getline(stream, peerEntry.ip, ZKFileDBModel::_peerEntrySeparator);
+ stream >> peerEntry.port;
+
+ result.push_back(peerEntry);
+ } catch (const std::exception&) {
+ LOG(warning, "Invalid peer entry: '%s'", peer.c_str());
+ //Ignore invalid peer entries
+ }
+}
+
+std::vector<std::string>::iterator
+prunePeers(std::vector<std::string> &peers, size_t maxPeers) {
+ if (peers.size() <= maxPeers)
+ return peers.end();
+
+ assert (maxPeers < 2147483648); //due to the usage of rand()
+
+ const size_t peersSize = peers.size();
+ for (size_t i=0; i<maxPeers; ++i) {
+ //i <= i + (std::rand() % ( peersSize -i )) <= peerSize - 1
+ size_t candidateBetween_i_and_peerSize = i + ( std::rand() % ( peersSize -i ));
+ std::swap(peers[i], peers[candidateBetween_i_and_peerSize]);
+ }
+
+ return peers.begin() + maxPeers;
+}
+
+} //anonymous namespace
+
+using filedistribution::FileDistributionModelImpl;
+
+struct FileDistributionModelImpl::DeployedFilesChangedCallback :
+ public ZKFacade::NodeChangedWatcher
+{
+ typedef boost::shared_ptr<DeployedFilesChangedCallback> SP;
+
+ boost::weak_ptr<FileDistributionModelImpl> _parent;
+
+ DeployedFilesChangedCallback(
+ const boost::shared_ptr<FileDistributionModelImpl> & parent)
+ :_parent(parent)
+ {}
+
+ //override
+ void operator()() {
+ if (boost::shared_ptr<FileDistributionModelImpl> model = _parent.lock()) {
+ model->_filesToDownloadChanged();
+ }
+ }
+};
+
+
+FileDistributionModelImpl::~FileDistributionModelImpl() {
+ LOG(debug, "Deconstructing FileDistributionModelImpl");
+}
+
+FileDistributionModelImpl::PeerEntries
+FileDistributionModelImpl::getPeers(const std::string& fileReference, size_t maxPeers) {
+ try {
+ fs::path path = _fileDBModel.getPeersPath(fileReference);
+
+ typedef std::vector<std::string> Peers;
+ Peers peers = _zk->getChildren(path);
+ // TODO: Take this port from some config somewhere instead of hardcoding
+ addConfigServersAsPeers(peers, getenv("services__addr_configserver"), 19093);
+
+ Peers::iterator end = prunePeers(peers, maxPeers);
+
+ PeerEntries result;
+ result.reserve(end - peers.begin());
+
+ namespace ll=boost::lambda;
+ std::for_each(peers.begin(), end,
+ ll::bind(&addPeerEntry, boost::lambda::_1, boost::ref(result)));
+
+ LOG(debug, "Found %zu peers for path '%s'", result.size(), path.string().c_str());
+ return result;
+ } catch(ZKNodeDoesNotExistsException&) {
+ LOG(debug, ("No peer entries available for " + fileReference).c_str());
+ return PeerEntries();
+ }
+}
+
+fs::path
+FileDistributionModelImpl::getPeerEntryPath(const std::string& fileReference) {
+ std::ostringstream entry;
+ entry <<_hostName
+ <<ZKFileDBModel::_peerEntrySeparator <<_port;
+
+ return _fileDBModel.getPeersPath(fileReference) / entry.str();
+}
+
+void
+FileDistributionModelImpl::addPeer(const std::string& fileReference) {
+ fs::path path = getPeerEntryPath(fileReference);
+ LOG(debug, "Adding peer '%s'", path.string().c_str());
+
+ if (_zk->hasNode(path)) {
+ LOG(info, "Retiring previous peer node owner.");
+ _zk->removeIfExists(path);
+ }
+ _zk->addEphemeralNode(path);
+}
+
+void
+FileDistributionModelImpl::removePeer(const std::string& fileReference) {
+ fs::path path = getPeerEntryPath(fileReference);
+ LOG(debug, "Removing peer '%s'", path.string().c_str());
+
+ _zk->removeIfExists(path);
+}
+
+//Assumes that addPeer has been called before the torrent was started,
+//so that we avoid the race condition between finishing downloading a torrent
+//and setting peer status
+void
+FileDistributionModelImpl::peerFinished(const std::string& fileReference) {
+ fs::path path = getPeerEntryPath(fileReference);
+ LOG(debug, "Peer finished '%s'", path.string().c_str());
+
+ try {
+ bool mustExist = true;
+ char progress = 100; //percent
+
+ _zk->setData(path, &progress, sizeof(char), mustExist);
+ } catch(ZKNodeDoesNotExistsException&) {
+ BOOST_THROW_EXCEPTION(NotPeer());
+ }
+}
+
+std::set<std::string>
+FileDistributionModelImpl::getFilesToDownload() {
+ DeployedFilesToDownload d(_zk.get());
+ std::vector<std::string> deployed = d.getDeployedFilesToDownload(_hostName,
+ DeployedFilesChangedCallback::SP(
+ new DeployedFilesChangedCallback(shared_from_this())));
+
+ std::set<std::string> result(deployed.begin(), deployed.end());
+
+ {
+ LockGuard guard(_activeFileReferencesMutex);
+ result.insert(_activeFileReferences.begin(), _activeFileReferences.end());
+ }
+ return result;
+}
+
+bool
+FileDistributionModelImpl::updateActiveFileReferences(
+ const std::vector<vespalib::string>& fileReferences) {
+
+ std::vector<vespalib::string> sortedFileReferences(fileReferences);
+ std::sort(sortedFileReferences.begin(), sortedFileReferences.end());
+
+ LockGuard guard(_activeFileReferencesMutex);
+ bool changed =
+ sortedFileReferences != _activeFileReferences;
+
+ sortedFileReferences.swap(_activeFileReferences);
+ return changed;
+}
+
+void
+FileDistributionModelImpl::addConfigServersAsPeers(
+ std::vector<std::string> & peers, char const* envConfigServers, int port) {
+
+ std::set<std::string> peersFromTracker(peers.begin(), peers.end());
+
+ if (envConfigServers == NULL) {
+ // Could be standalone cluster (not set for this).
+ return;
+ }
+ std::string configServerCommaListed(envConfigServers);
+ std::stringstream configserverstream(configServerCommaListed);
+ std::string configserver;
+ while (std::getline(configserverstream, configserver, ',')) {
+ configserver += ":" + std::to_string(port);
+ if (peersFromTracker.find(configserver) == peersFromTracker.end()) {
+ peers.push_back(configserver);
+ LOG(debug, "Adding configserver '%s'", configserver.c_str());
+ } else {
+ LOG(debug, "Configserver already added '%s'", configserver.c_str());
+ }
+ }
+}
+
+
+
+void
+FileDistributionModelImpl::configure(std::unique_ptr<FilereferencesConfig> config) {
+ try {
+ const bool changed = updateActiveFileReferences(config->filereferences);
+ if (changed)
+ _filesToDownloadChanged();
+ } catch(...) {
+ _exceptionRethrower->store(boost::current_exception());
+ }
+}
diff --git a/filedistribution/src/vespa/filedistribution/model/filedistributionmodelimpl.h b/filedistribution/src/vespa/filedistribution/model/filedistributionmodelimpl.h
new file mode 100644
index 00000000000..04a111a00df
--- /dev/null
+++ b/filedistribution/src/vespa/filedistribution/model/filedistributionmodelimpl.h
@@ -0,0 +1,76 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <boost/enable_shared_from_this.hpp>
+
+#include "filedistributionmodel.h"
+#include <vespa/filedistribution/model/config-filereferences.h>
+#include "zkfacade.h"
+#include "zkfiledbmodel.h"
+#include <vespa/filedistribution/common/exceptionrethrower.h>
+#include <vespa/config/config.h>
+
+using cloud::config::filedistribution::FilereferencesConfig;
+
+namespace filedistribution {
+
+class FileDistributionModelImpl : public FileDistributionModel,
+ public config::IFetcherCallback<FilereferencesConfig>,
+ public boost::enable_shared_from_this<FileDistributionModelImpl>
+{
+ struct DeployedFilesChangedCallback;
+
+ const std::string _hostName;
+ const int _port;
+
+ const boost::shared_ptr<ZKFacade> _zk;
+ ZKFileDBModel _fileDBModel;
+
+ boost::mutex _activeFileReferencesMutex;
+ typedef boost::lock_guard<boost::mutex> LockGuard;
+ std::vector<vespalib::string> _activeFileReferences;
+
+ const boost::shared_ptr<ExceptionRethrower> _exceptionRethrower;
+
+ bool /*changed*/
+ updateActiveFileReferences(const std::vector<vespalib::string>& fileReferences);
+
+ ZKFacade::Path getPeerEntryPath(const std::string& fileReference);
+public:
+ FileDistributionModelImpl(const std::string& hostName, int port,
+ const boost::shared_ptr<ZKFacade>& zk,
+ const boost::shared_ptr<ExceptionRethrower>& exceptionRethrower)
+ :_hostName(hostName),
+ _port(port),
+ _zk(zk),
+ _fileDBModel(_zk),
+ _exceptionRethrower(exceptionRethrower)
+ {
+ /* Hack: Force the first call to updateActiveFileReferences to return changed=true
+ when the file references config is empty.
+ This ensures that the "deployed files to download" nodes in zookeeper are read at start up.
+ */
+ _activeFileReferences.push_back("force-initial-files-to-download-changed-signal");
+ }
+
+ ~FileDistributionModelImpl();
+
+ //overrides FileDistributionModel
+ FileDBModel& getFileDBModel() {
+ return _fileDBModel;
+ }
+
+ std::set<std::string> getFilesToDownload();
+
+ PeerEntries getPeers(const std::string& fileReference, size_t maxPeers);
+ void addPeer(const std::string& fileReference);
+ void removePeer(const std::string& fileReference);
+ void peerFinished(const std::string& fileReference);
+ void addConfigServersAsPeers(std::vector<std::string>& peers, char const* envConfigServer, int port);
+
+ //Overrides Subscriber
+ void configure(std::unique_ptr<FilereferencesConfig> config);
+};
+
+} //namespace filedistribution
+
diff --git a/filedistribution/src/vespa/filedistribution/model/filereferences.def b/filedistribution/src/vespa/filedistribution/model/filereferences.def
new file mode 100644
index 00000000000..cd7212f0821
--- /dev/null
+++ b/filedistribution/src/vespa/filedistribution/model/filereferences.def
@@ -0,0 +1,3 @@
+# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+namespace=cloud.config.filedistribution
+filereferences[] string
diff --git a/filedistribution/src/vespa/filedistribution/model/mockfiledistributionmodel.h b/filedistribution/src/vespa/filedistribution/model/mockfiledistributionmodel.h
new file mode 100644
index 00000000000..169225deaf2
--- /dev/null
+++ b/filedistribution/src/vespa/filedistribution/model/mockfiledistributionmodel.h
@@ -0,0 +1,61 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "filedistributionmodel.h"
+
+#include <algorithm>
+#include <vector>
+
+namespace filedistribution {
+
+class MockFileDBModel : public FileDBModel {
+ std::vector<std::string> _fileReferences;
+public:
+//Overrides
+ bool hasFile(const std::string& fileReference) {
+ return std::find(_fileReferences.begin(), _fileReferences.end(), fileReference) != _fileReferences.end();
+ }
+
+ void addFile(const std::string& fileReference, const Buffer & buffer) {
+ (void)buffer;
+ _fileReferences.push_back(fileReference);
+ }
+
+ Move<Buffer> getFile(const std::string& fileReference) {
+ (void)fileReference;
+ const char* resultStr = "result";
+ Buffer result(resultStr, resultStr + strlen(resultStr));
+ return move(result);
+ }
+
+ virtual void cleanFiles(
+ const std::vector<std::string> &) {}
+
+
+ virtual void setDeployedFilesToDownload(const std::string&,
+ const std::string&,
+ const std::vector<std::string> &) {}
+ virtual void cleanDeployedFilesToDownload(
+ const std::vector<std::string> &,
+ const std::string&) {}
+ virtual void removeDeploymentsThatHaveDifferentApplicationId(
+ const std::vector<std::string> &,
+ const std::string&) {}
+
+ virtual std::vector<std::string> getHosts() {
+ return std::vector<std::string>();
+ }
+
+ virtual HostStatus getHostStatus(const std::string&) {
+ return HostStatus();
+ }
+
+ Progress getProgress(const std::string&,
+ const std::vector<std::string>&) {
+ return Progress();
+ }
+};
+
+
+} //namespace filedistribution
+
diff --git a/filedistribution/src/vespa/filedistribution/model/zkfacade.cpp b/filedistribution/src/vespa/filedistribution/model/zkfacade.cpp
new file mode 100644
index 00000000000..71c93be1b98
--- /dev/null
+++ b/filedistribution/src/vespa/filedistribution/model/zkfacade.cpp
@@ -0,0 +1,648 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/fastos/fastos.h>
+
+#include "zkfacade.h"
+
+#include <iostream>
+#include <unistd.h>
+#include <signal.h>
+#include <cassert>
+#include <cstdio>
+#include <sstream>
+#include <boost/lambda/lambda.hpp>
+#include <boost/lambda/bind.hpp>
+#include <boost/throw_exception.hpp>
+#include <boost/function_output_iterator.hpp>
+#include <boost/thread.hpp>
+
+#include <zookeeper/zookeeper.h>
+#include <vespa/filedistribution/common/logfwd.h>
+#include <vespa/defaults.h>
+
+typedef boost::unique_lock<boost::mutex> UniqueLock;
+
+using filedistribution::ZKFacade;
+using filedistribution::Move;
+using filedistribution::Buffer;
+using filedistribution::ZKGenericException;
+
+typedef ZKFacade::Path Path;
+
+namespace {
+
+class RetryController {
+ unsigned int _retryCount;
+ ZKFacade& _zkFacade;
+
+ static const unsigned int _maxRetries = 10;
+public:
+ int _lastStatus;
+
+ RetryController(ZKFacade* zkFacade)
+ :_retryCount(0),
+ _zkFacade(*zkFacade),
+ _lastStatus(0)
+ {}
+
+ void operator()(int status) {
+ _lastStatus = status;
+ }
+
+ bool shouldRetry() {
+ ++_retryCount;
+
+ return _zkFacade.retriesEnabled() &&
+ _lastStatus != ZOK &&
+ _retryCount < _maxRetries &&
+ isNonLastingError(_lastStatus) &&
+ pause();
+ }
+
+ bool isNonLastingError(int error) {
+ return error == ZCONNECTIONLOSS ||
+ error == ZOPERATIONTIMEOUT;
+ }
+
+ bool pause() {
+ unsigned int sleepInSeconds = 1;
+ sleep(sleepInSeconds);
+ LOGFWD(info, "Retrying zookeeper operation.");
+ return true;
+ }
+
+ void throwIfError() {
+ namespace fd = filedistribution;
+
+ switch (_lastStatus) {
+ case ZSESSIONEXPIRED:
+ BOOST_THROW_EXCEPTION(fd::ZKSessionExpired());
+ case ZNONODE:
+ BOOST_THROW_EXCEPTION(fd::ZKNodeDoesNotExistsException());
+ case ZNODEEXISTS:
+ BOOST_THROW_EXCEPTION(fd::ZKNodeExistsException());
+ default:
+ if (_lastStatus != ZOK)
+ BOOST_THROW_EXCEPTION(fd::ZKGenericException(_lastStatus));
+ }
+ }
+};
+
+class DeallocateZKStringVectorGuard {
+ String_vector& _strings;
+public:
+ DeallocateZKStringVectorGuard(String_vector& strings)
+ :_strings(strings)
+ {}
+
+ ~DeallocateZKStringVectorGuard() {
+ deallocate_String_vector(&_strings);
+ }
+};
+
+const Path
+setDataForNewFile(ZKFacade& zk, const Path& path, const char* buffer, int length, zhandle_t* zhandle, int createFlags) {
+
+ RetryController retryController(&zk);
+ const int maxPath = 1024;
+ char createdPath[maxPath];
+ do {
+ retryController(
+ zoo_create(zhandle, path.string().c_str(),
+ buffer, length,
+ &ZOO_OPEN_ACL_UNSAFE, createFlags,
+ createdPath, maxPath));
+ } while (retryController.shouldRetry());
+
+ retryController.throwIfError();
+ return Path(createdPath);
+}
+
+void
+setDataForExistingFile(ZKFacade& zk, const Path& path, const char* buffer, int length, zhandle_t* zhandle) {
+ RetryController retryController(&zk);
+
+ const int ignoreVersion = -1;
+ do {
+ retryController(
+ zoo_set(zhandle, path.string().c_str(),
+ buffer, length, ignoreVersion));
+ } while (retryController.shouldRetry());
+
+ retryController.throwIfError();
+}
+
+} //anonymous namespace
+
+/********** Active watchers *******************************************/
+struct ZKFacade::ZKWatcher {
+ const boost::weak_ptr<ZKFacade> _owner;
+ const NodeChangedWatcherSP _nodeChangedWatcher;
+
+ ZKWatcher(
+ const boost::shared_ptr<ZKFacade> &owner,
+ const NodeChangedWatcherSP& nodeChangedWatcher )
+ :_owner(owner),
+ _nodeChangedWatcher(nodeChangedWatcher)
+ {}
+
+ static void watcherFn(zhandle_t *zh, int type,
+ int state, const char *path,void *watcherContext) {
+
+ (void)zh;
+ (void)state;
+ (void)path;
+
+ if (type == ZOO_SESSION_EVENT) {
+ //The session events do not cause unregistering of the watcher
+ //inside zookeeper, so don't unregister it here.
+ LOGFWD(debug, "ZKWatcher recieved session event with state '%d'. Ignoring", state);
+ return;
+ }
+
+ LOGFWD(debug, "ZKWatcher: Begin watcher called for path '%s' with type %d.", path, type);
+
+ ZKWatcher* self = static_cast<ZKWatcher*>(watcherContext);
+
+ //WARNING: Since we're creating a shared_ptr to ZKFacade here, this might cause
+ //destruction of the ZKFacade in a zookeeper thread.
+ //Since zookeeper_close blocks until all watcher threads are finished, and we're inside a watcher thread,
+ //this will cause infinite waiting.
+ //To avoid this, a custom shared_ptr deleter using a separate deleter thread must be used.
+
+ if (boost::shared_ptr<ZKFacade> zk = self->_owner.lock()) {
+ zk->invokeWatcher(watcherContext);
+ }
+
+ LOGFWD(debug, "ZKWatcher: End watcher called for path '%s' with type %d.", path, type);
+ }
+};
+
+void
+ZKFacade::stateWatchingFun(zhandle_t*, int type, int state, const char* path, void* context) {
+ (void)path;
+
+ //The ZKFacade won't expire before zookeeper_close has finished.
+ ZKFacade* self = (ZKFacade*)context;
+ if (type == ZOO_SESSION_EVENT) {
+ LOGFWD(debug, "Zookeeper session event: %d", state);
+ if (state == ZOO_EXPIRED_SESSION_STATE) {
+ self->_exceptionRethrower->store(ZKSessionExpired());
+ } else if (state == ZOO_AUTH_FAILED_STATE) {
+ self->_exceptionRethrower->store(ZKGenericException(ZNOAUTH));
+ }
+ } else {
+ LOGFWD(info, "State watching function: Unexpected event: '%d' -- '%d' ", type, state);
+ }
+}
+
+
+void* /* watcherContext */
+ZKFacade::registerWatcher(const NodeChangedWatcherSP& watcher) {
+
+ UniqueLock lock(_watchersMutex);
+ boost::shared_ptr<ZKWatcher> zkWatcher(new ZKWatcher(shared_from_this(), watcher));
+ _watchers[zkWatcher.get()] = zkWatcher;
+ return zkWatcher.get();
+}
+
+
+boost::shared_ptr<ZKFacade::ZKWatcher>
+ZKFacade::unregisterWatcher(void* watcherContext) {
+ UniqueLock lock(_watchersMutex);
+
+ WatchersMap::iterator i = _watchers.find(watcherContext);
+ if (i == _watchers.end()) {
+ return boost::shared_ptr<ZKWatcher>();
+ } else {
+ boost::shared_ptr<ZKWatcher> result = i->second;
+ _watchers.erase(i);
+ return result;
+ }
+}
+
+void
+ZKFacade::invokeWatcher(void* watcherContext) {
+ try {
+ boost::shared_ptr<ZKWatcher> watcher = unregisterWatcher(watcherContext);
+
+ if (!_watchersEnabled)
+ return;
+
+ if (watcher) {
+ (*watcher->_nodeChangedWatcher)();
+ } else {
+ LOGFWD(error, "Invoke called on expired watcher.");
+ }
+ } catch(...) {
+ _exceptionRethrower->store(boost::current_exception());
+ }
+}
+
+/********** End live watchers ***************************************/
+
+
+ZKFacade::ZKFacade(const std::string& zkservers,
+ const boost::shared_ptr<ExceptionRethrower> &exceptionRethrower)
+ :_retriesEnabled(true),
+ _watchersEnabled(true),
+ _exceptionRethrower(exceptionRethrower),
+ _zhandle(zookeeper_init(zkservers.c_str(),
+ &ZKFacade::stateWatchingFun,
+ _zkSessionTimeOut,
+ 0, //clientid,
+ this, //context,
+ 0)) //flags
+{
+ if (!_zhandle) {
+ BOOST_THROW_EXCEPTION(ZKFailedConnecting());
+ }
+}
+
+ZKFacade::~ZKFacade() {
+ disableRetries();
+ _watchersEnabled = false;
+
+ boost::thread shutdownCaller(zookeeper_close, _zhandle);
+ if (shutdownCaller.timed_join(boost::posix_time::seconds(120))) {
+ LOGFWD(debug, "Zookeeper connection closed successfully.");
+ } else {
+ LOGFWD(info, "Timed out waiting for the zookeeper connection to shut down.");
+ abort();
+ }
+}
+
+const std::string
+ZKFacade::getString(const Path& path) {
+ Buffer buffer(getData(path));
+ return std::string(buffer.begin(), buffer.end());
+}
+
+const Move<Buffer>
+ZKFacade::getData(const Path& path) {
+ RetryController retryController(this);
+ try {
+ Buffer buffer(_maxDataSize);
+ int bufferSize = _maxDataSize;
+
+ const int watchIsOff = 0;
+ do {
+ Stat stat;
+ bufferSize = _maxDataSize;
+
+ retryController(
+ zoo_get(_zhandle, path.string().c_str(), watchIsOff,
+ &*buffer.begin(),
+ &bufferSize, //in & out
+ &stat));
+ } while(retryController.shouldRetry());
+
+ retryController.throwIfError();
+
+ buffer.resize(bufferSize);
+ return move(buffer);
+
+ } catch(boost::exception& e) {
+ e <<errorinfo::Path(path);
+ throw;
+ }
+}
+
+const Move<Buffer>
+ZKFacade::getData(const Path& path, const NodeChangedWatcherSP& watcher) {
+ void* watcherContext = registerWatcher(watcher);
+ RetryController retryController(this);
+
+ try {
+ Buffer buffer(_maxDataSize);
+ int bufferSize = _maxDataSize;
+
+ do {
+ Stat stat;
+ bufferSize = _maxDataSize;
+
+ retryController(
+ zoo_wget(_zhandle, path.string().c_str(),
+ &ZKWatcher::watcherFn, watcherContext,
+ &*buffer.begin(),
+ &bufferSize, //in & out
+ &stat));
+ } while(retryController.shouldRetry());
+
+ retryController.throwIfError();
+
+ buffer.resize(bufferSize);
+ return move(buffer);
+
+ } catch(boost::exception& e) {
+ unregisterWatcher(watcherContext);
+ e <<errorinfo::Path(path);
+ throw;
+ }
+}
+
+void
+ZKFacade::setData(const Path& path, const Buffer& buffer, bool mustExist) {
+ return setData(path, &*buffer.begin(), buffer.size(), mustExist);
+}
+
+void
+ZKFacade::setData(const Path& path, const char* buffer, size_t length, bool mustExist) {
+ assert (length < _maxDataSize);
+
+ try {
+ if (mustExist || hasNode(path))
+ setDataForExistingFile(*this, path, buffer, length, _zhandle);
+ else
+ setDataForNewFile(*this, path, buffer, length, _zhandle, 0);
+ } catch(boost::exception& e) {
+ e <<errorinfo::Path(path);
+ throw;
+ }
+}
+
+const Path
+ZKFacade::createSequenceNode(const Path& path, const char* buffer, size_t length) {
+ assert (length < _maxDataSize);
+
+ int createFlags = ZOO_SEQUENCE;
+ return setDataForNewFile(*this, path, buffer, length, _zhandle, createFlags);
+}
+
+bool
+ZKFacade::hasNode(const Path& path) {
+ try {
+ RetryController retryController(this);
+ do {
+ Stat stat;
+ const int noWatch = 0;
+ retryController(
+ zoo_exists(_zhandle, path.string().c_str(), noWatch, &stat));
+ } while(retryController.shouldRetry());
+
+ switch(retryController._lastStatus) {
+ case ZNONODE:
+ return false;
+ case ZOK:
+ return true;
+ default:
+ retryController.throwIfError();
+ //this should never happen:
+ assert(false);
+ return false;
+ }
+
+ } catch (boost::exception &e) {
+ e <<errorinfo::Path(path);
+ throw;
+ }
+}
+
+bool
+ZKFacade::hasNode(const Path& path, const NodeChangedWatcherSP& watcher) {
+ void* watcherContext = registerWatcher(watcher);
+ try {
+ RetryController retryController(this);
+ do {
+ Stat stat;
+ retryController(
+ zoo_wexists(_zhandle, path.string().c_str(),
+ &ZKWatcher::watcherFn, watcherContext,
+ &stat));
+ } while(retryController.shouldRetry());
+
+ switch(retryController._lastStatus) {
+ case ZNONODE:
+ return false;
+ case ZOK:
+ return true;
+ default:
+ retryController.throwIfError();
+ //this should never happen:
+ assert(false);
+ return false;
+ }
+
+ } catch (boost::exception &e) {
+ unregisterWatcher(watcherContext);
+ e <<errorinfo::Path(path);
+ throw;
+ }
+}
+
+void
+ZKFacade::addEphemeralNode(const Path& path) {
+ try {
+ setDataForNewFile(*this, path, "", 0, _zhandle, ZOO_EPHEMERAL);
+ } catch(const ZKNodeExistsException& e) {
+ remove(path);
+ addEphemeralNode(path);
+ } catch (boost::exception& e) {
+ e <<errorinfo::Path(path);
+ throw;
+ }
+}
+
+void
+ZKFacade::remove(const Path& path) {
+ namespace ll = boost::lambda;
+
+ std::vector< std::string > children = getChildren(path);
+ if (!children.empty()) {
+ std::for_each(children.begin(), children.end(),
+ ll::bind(&ZKFacade::remove, this,
+ ll::ret<Path>(path / ll::_1)));
+ }
+
+ try {
+ RetryController retryController(this);
+ do {
+ int ignoreVersion = -1;
+
+ retryController(
+ zoo_delete(_zhandle, path.string().c_str(),
+ ignoreVersion));
+ } while (retryController.shouldRetry());
+
+ if (retryController._lastStatus != ZNONODE)
+ retryController.throwIfError();
+
+ } catch(boost::exception& e) {
+ e <<errorinfo::Path(path);
+ }
+}
+
+void
+ZKFacade::removeIfExists(const Path& path) {
+ try {
+ if (hasNode(path)) {
+ remove(path);
+ }
+ } catch (const ZKNodeDoesNotExistsException& e) {
+ //someone else removed it concurrently, not a problem.
+ }
+}
+
+void
+ZKFacade::retainOnly(const Path& path, const std::vector<std::string>& childrenToPreserve) {
+ typedef std::vector<std::string> Children;
+
+ Children current = getChildren(path);
+ std::sort(current.begin(), current.end());
+
+ Children toPreserveSorted(childrenToPreserve);
+ std::sort(toPreserveSorted.begin(), toPreserveSorted.end());
+
+ namespace ll = boost::lambda;
+ std::set_difference(current.begin(), current.end(),
+ toPreserveSorted.begin(), toPreserveSorted.end(),
+ boost::make_function_output_iterator(
+ ll::bind(&ZKFacade::remove, this,
+ ll::ret<Path>(path / ll::_1))));
+}
+
+std::vector< std::string >
+ZKFacade::getChildren(const Path& path) {
+ try {
+ RetryController retryController(this);
+ String_vector children;
+ do {
+ const bool watch = false;
+ retryController(
+ zoo_get_children(_zhandle, path.string().c_str(), watch, &children));
+ } while(retryController.shouldRetry());
+
+ retryController.throwIfError();
+
+ DeallocateZKStringVectorGuard deallocateGuard(children);
+
+ typedef std::vector<std::string> ResultType;
+ ResultType result;
+ result.reserve(children.count);
+
+ std::copy(children.data, children.data + children.count,
+ std::back_inserter(result));
+
+ return result;
+ } catch (boost::exception& e) {
+ e <<errorinfo::Path(path);
+ throw;
+ }
+}
+
+std::vector< std::string >
+ZKFacade::getChildren(const Path& path, const NodeChangedWatcherSP& watcher) {
+ void* watcherContext = registerWatcher(watcher);
+
+ try {
+ RetryController retryController(this);
+ String_vector children;
+ do {
+ retryController(
+ zoo_wget_children(_zhandle, path.string().c_str(),
+ &ZKWatcher::watcherFn, watcherContext,
+ &children));
+ } while(retryController.shouldRetry());
+
+ retryController.throwIfError();
+
+ DeallocateZKStringVectorGuard deallocateGuard(children);
+
+ typedef std::vector<std::string> ResultType;
+ ResultType result;
+ result.reserve(children.count);
+
+ std::copy(children.data, children.data + children.count,
+ std::back_inserter(result));
+
+ return result;
+ } catch (boost::exception& e) {
+ unregisterWatcher(watcherContext);
+ e <<errorinfo::Path(path);
+ throw;
+ }
+}
+
+
+void
+ZKFacade::disableRetries() {
+ _retriesEnabled = false;
+}
+
+void
+filedistribution::setupZooKeeperLogging() {
+ std::string filename(vespa::Defaults::vespaHome());
+ filename.append("/tmp/zookeeper.log");
+ FILE* file = std::fopen(filename.c_str(), "w");
+ if (file == NULL) {
+ std::cerr <<"Could not open file " <<filename << std::endl;
+ } else {
+ zoo_set_log_stream(file);
+ }
+
+ zoo_set_debug_level(ZOO_LOG_LEVEL_ERROR);
+}
+
+const char*
+ZKGenericException::what() const throw() {
+ switch(_zkStatus) {
+ //System errors
+ case ZRUNTIMEINCONSISTENCY:
+ return "Zookeeper: A runtime inconsistency was found(ZRUNTIMEINCONSISTENCY)";
+ case ZDATAINCONSISTENCY:
+ return "Zookeeper: A data inconsistency was found(ZDATAINCONSISTENCY)";
+ case ZCONNECTIONLOSS:
+ return "Zookeeper: Connection to the server has been lost(ZCONNECTIONLOSS)";
+ case ZMARSHALLINGERROR:
+ return "Zookeeper: Error while marshalling or unmarshalling data(ZMARSHALLINGERROR)";
+ case ZUNIMPLEMENTED:
+ return "Zookeeper: Operation is unimplemented(ZUNIMPLEMENTED)";
+ case ZOPERATIONTIMEOUT:
+ return "Zookeeper: Operation timeout(ZOPERATIONTIMEOUT)";
+ case ZBADARGUMENTS:
+ return "Zookeeper: Invalid arguments(ZBADARGUMENTS)";
+ case ZINVALIDSTATE:
+ return "Zookeeper: The connection with the zookeeper servers timed out(ZINVALIDSTATE).";
+
+ //API errors
+ case ZNONODE:
+ return "Zookeeper: Node does not exist(ZNONODE)";
+ case ZNOAUTH:
+ return "Zookeeper: Not authenticated(ZNOAUTH)";
+ case ZBADVERSION:
+ return "Zookeeper: Version conflict(ZBADVERSION)";
+ case ZNOCHILDRENFOREPHEMERALS:
+ return "Zookeeper: Ephemeral nodes may not have children(ZNOCHILDRENFOREPHEMERALS)";
+ case ZNODEEXISTS:
+ return "Zookeeper: The node already exists(ZNODEEXISTS)";
+ case ZNOTEMPTY:
+ return "Zookeeper: The node has children(ZNOTEMPTY)";
+ case ZSESSIONEXPIRED:
+ return "Zookeeper: The session has been expired by the server(ZSESSIONEXPIRED)";
+ case ZINVALIDCALLBACK:
+ return "Zookeeper: Invalid callback specified(ZINVALIDCALLBACK)";
+ case ZINVALIDACL:
+ return "Zookeeper: Invalid ACL specified(ZINVALIDACL)";
+ case ZAUTHFAILED:
+ return "Zookeeper: Client authentication failed(ZAUTHFAILED)";
+ case ZCLOSING:
+ return "Zookeeper: ZooKeeper is closing(ZCLOSING)";
+ case ZNOTHING:
+ return "Zookeeper: No server responses to process(ZNOTHING)";
+ default:
+ std::cerr <<"In ZKGenericException::what(): Invalid error code " <<_zkStatus <<std::endl;
+ return "Zookeeper: Invalid error code.";
+ }
+}
+
+const std::string
+filedistribution::diagnosticUserLevelMessage(const ZKException& exception) {
+ const char* indent = " ";
+ std::ostringstream message;
+ message <<exception.what();
+
+ const errorinfo::Path::value_type* path = boost::get_error_info<errorinfo::Path>(exception);
+ if (path) {
+ message <<std::endl <<indent <<"Path: " <<*path;
+ }
+ return message.str();
+}
diff --git a/filedistribution/src/vespa/filedistribution/model/zkfacade.h b/filedistribution/src/vespa/filedistribution/model/zkfacade.h
new file mode 100644
index 00000000000..7db6b53cc18
--- /dev/null
+++ b/filedistribution/src/vespa/filedistribution/model/zkfacade.h
@@ -0,0 +1,135 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <string>
+#include <vector>
+#include <boost/noncopyable.hpp>
+#include <boost/filesystem/path.hpp>
+#include <boost/signals2.hpp>
+#include <boost/enable_shared_from_this.hpp>
+
+#include <vespa/filedistribution/common/buffer.h>
+#include <vespa/filedistribution/common/exception.h>
+#include <vespa/filedistribution/common/exceptionrethrower.h>
+
+struct _zhandle;
+typedef _zhandle zhandle_t;
+
+namespace filedistribution {
+
+namespace errorinfo {
+typedef boost::error_info<struct tag_Path, boost::filesystem::path> Path;
+}
+
+class ZKException : public Exception {
+protected:
+ ZKException() {}
+};
+
+struct ZKNodeDoesNotExistsException : public ZKException {
+ const char* what() const throw() {
+ return "Zookeeper: The node does not exist(ZNONODE).";
+ }
+};
+
+struct ZKNodeExistsException : public ZKException {
+ const char* what() const throw() {
+ return "Zookeeper: The node already exists(ZNODEEXISTS).";
+ }
+};
+
+struct ZKGenericException : public ZKException {
+ const int _zkStatus;
+ ZKGenericException(int zkStatus)
+ :_zkStatus(zkStatus)
+ {}
+
+ const char* what() const throw();
+};
+
+struct ZKFailedConnecting : public ZKException {
+ const char* what() const throw() {
+ return "Zookeeper: Failed connecting to the zookeeper servers.";
+ }
+};
+
+class ZKSessionExpired : public ZKException {};
+
+const std::string
+diagnosticUserLevelMessage(const ZKException& zk);
+
+
+
+class ZKFacade : boost::noncopyable, public boost::enable_shared_from_this<ZKFacade> {
+ volatile bool _retriesEnabled;
+ volatile bool _watchersEnabled;
+
+ boost::shared_ptr<ExceptionRethrower> _exceptionRethrower;
+ zhandle_t* _zhandle;
+ const static int _zkSessionTimeOut = 30 * 1000;
+ const static size_t _maxDataSize = 1024 * 1024;
+
+ class ZKWatcher;
+ static void stateWatchingFun(zhandle_t*, int type, int state, const char* path, void* context);
+public:
+ typedef boost::shared_ptr<ZKFacade> SP;
+
+ /* Lifetime is managed by ZKFacade.
+ Derived classes should only contain weak_ptrs to other objects
+ to avoid linking their lifetime to the ZKFacade lifetime.
+ */
+ class NodeChangedWatcher : boost::noncopyable {
+ public:
+ virtual void operator()() = 0;
+ virtual ~NodeChangedWatcher() {};
+ };
+
+ typedef boost::shared_ptr<NodeChangedWatcher> NodeChangedWatcherSP;
+ typedef boost::filesystem::path Path;
+
+ ZKFacade(const std::string& zkservers, const boost::shared_ptr<ExceptionRethrower> &);
+ ~ZKFacade();
+
+ bool hasNode(const Path&);
+ bool hasNode(const Path&, const NodeChangedWatcherSP&);
+
+ const std::string getString(const Path&);
+ const Move<Buffer> getData(const Path&); //throws ZKNodeDoesNotExistsException
+ //if watcher is specified, it will be set even if the node does not exists
+ const Move<Buffer> getData(const Path&, const NodeChangedWatcherSP&); //throws ZKNodeDoesNotExistsException
+
+ //Parent path must exist
+ void setData(const Path&, const Buffer& buffer, bool mustExist = false);
+ void setData(const Path&, const char* buffer, size_t length, bool mustExist = false);
+
+ const Path createSequenceNode(const Path&, const char* buffer, size_t length);
+
+ void remove(const Path&); //throws ZKNodeDoesNotExistsException
+ void removeIfExists(const Path&);
+
+ void retainOnly(const Path&, const std::vector<std::string>& children);
+
+ void addEphemeralNode(const Path& path);
+ std::vector<std::string> getChildren(const Path& path);
+ std::vector<std::string> getChildren(const Path& path, const NodeChangedWatcherSP&); //throws ZKNodeDoesNotExistsException
+
+ //only for use by shutdown code.
+ void disableRetries();
+ bool retriesEnabled() {
+ return _retriesEnabled;
+ }
+
+private:
+ void* registerWatcher(const NodeChangedWatcherSP &); //returns watcherContext
+ boost::shared_ptr<ZKWatcher> unregisterWatcher(void* watcherContext);
+ void invokeWatcher(void* watcherContext);
+
+ boost::mutex _watchersMutex;
+ typedef std::map<void*, boost::shared_ptr<ZKWatcher> > WatchersMap;
+ WatchersMap _watchers;
+};
+
+void setupZooKeeperLogging();
+
+} //namespace filedistribution
+
diff --git a/filedistribution/src/vespa/filedistribution/model/zkfiledbmodel.cpp b/filedistribution/src/vespa/filedistribution/model/zkfiledbmodel.cpp
new file mode 100644
index 00000000000..3483a4eb359
--- /dev/null
+++ b/filedistribution/src/vespa/filedistribution/model/zkfiledbmodel.cpp
@@ -0,0 +1,298 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/fastos/fastos.h>
+#include "filedistributionmodel.h"
+
+#include <ostream>
+#include <algorithm>
+#include <boost/lambda/lambda.hpp>
+#include <boost/lambda/bind.hpp>
+#include <boost/foreach.hpp>
+
+#include "zkfacade.h"
+#include "zkfiledbmodel.h"
+#include "deployedfilestodownload.h"
+#include <vespa/filedistribution/common/logfwd.h>
+
+namespace fs = boost::filesystem;
+
+using filedistribution::ZKFileDBModel;
+
+namespace {
+
+fs::path
+createPath(const std::string& fileReference) {
+ return filedistribution::ZKFileDBModel::_fileDBPath / fileReference;
+}
+
+void
+createNode(const fs::path & path, filedistribution::ZKFacade& zk) {
+ if (!zk.hasNode(path))
+ zk.setData(path, "", 0);
+}
+
+bool
+isEntryForHost(const std::string& host, const std::string& peerEntry) {
+ return host.size() < peerEntry.size() &&
+ std::equal(host.begin(), host.end(), peerEntry.begin()) &&
+ peerEntry[host.size()] == ZKFileDBModel::_peerEntrySeparator;
+}
+
+std::vector<std::string>
+getSortedChildren(filedistribution::ZKFacade& zk, const ZKFileDBModel::Path& path) {
+ std::vector<std::string> children = zk.getChildren(path);
+ std::sort(children.begin(), children.end());
+ return children;
+}
+
+} //anonymous namespace
+
+const ZKFileDBModel::Path ZKFileDBModel::_root = "/vespa/filedistribution";
+const ZKFileDBModel::Path ZKFileDBModel::_fileDBPath = _root / "files";
+const ZKFileDBModel::Path ZKFileDBModel::_hostsPath = _root / "hosts";
+
+bool
+ZKFileDBModel::hasFile(const std::string& fileReference) {
+ return _zk->hasNode(createPath(fileReference));
+}
+
+void
+ZKFileDBModel::addFile(const std::string& fileReference, const Buffer& buffer) {
+ return _zk->setData(createPath(fileReference), buffer);
+}
+
+filedistribution::Move<filedistribution::Buffer>
+ZKFileDBModel::getFile(const std::string& fileReference) {
+ try {
+ return _zk->getData(createPath(fileReference));
+ } catch(const ZKNodeDoesNotExistsException&) {
+ throw FileDoesNotExistException();
+ }
+}
+
+void
+ZKFileDBModel::setDeployedFilesToDownload(
+ const std::string& hostName,
+ const std::string& appId,
+ const std::vector<std::string>& files) {
+ DeployedFilesToDownload d(_zk.get());
+ d.setDeployedFilesToDownload(hostName, appId, files);
+}
+
+void
+ZKFileDBModel::cleanDeployedFilesToDownload(
+ const std::vector<std::string>& hostsToPreserve,
+ const std::string& appId) {
+
+ std::vector<std::string> allHosts = getHosts();
+ std::set<std::string> toPreserve(hostsToPreserve.begin(), hostsToPreserve.end());
+
+ for (auto & host : allHosts) {
+ Path hostPath = _hostsPath / host;
+ try {
+ removeLegacyDeployFileNodes(hostPath);
+ // If this host is NOT part of hosts to deploy to
+ if (toPreserve.find(host) == toPreserve.end()) {
+ removeDeployFileNodes(hostPath, appId);
+ if (canRemoveHost(hostPath, appId)) {
+ _zk->remove(hostPath);
+ }
+ }
+ } catch (const ZKNodeDoesNotExistsException& e) {
+ LOGFWD(debug, "Host '%s' changed. Not touching", hostPath.string().c_str());
+ }
+ }
+}
+
+void
+ZKFileDBModel::removeDeploymentsThatHaveDifferentApplicationId(
+ const std::vector<std::string>& hostsToPreserve,
+ const std::string& appId) {
+
+ std::vector<std::string> allHosts = getHosts();
+ std::set<std::string> toPreserve(hostsToPreserve.begin(), hostsToPreserve.end());
+
+ for (auto & host : allHosts) {
+ Path hostPath = _hostsPath / host;
+ try {
+ if (toPreserve.find(host) != toPreserve.end()) {
+ removeNonApplicationFiles(hostPath, appId);
+ }
+ } catch (const ZKNodeDoesNotExistsException& e) {
+ LOGFWD(debug, "Host '%s' changed. Not touching", hostPath.string().c_str());
+ }
+ }
+}
+
+
+// Delete files which do not belong to this application.
+void
+ZKFileDBModel::removeNonApplicationFiles(const Path & hostPath, const std::string& appId)
+{
+ std::vector<std::string> deployNodes = _zk->getChildren(hostPath);
+ for (auto & deployNode : deployNodes) {
+ Path deployNodePath = hostPath / deployNode;
+ std::string applicationId(readApplicationId(*_zk, deployNodePath));
+ if (appId != applicationId) {
+ _zk->remove(deployNodePath);
+ }
+ }
+}
+
+
+void
+ZKFileDBModel::removeLegacyDeployFileNodes(const Path & hostPath)
+{
+ std::vector<std::string> deployNodes = _zk->getChildren(hostPath);
+ for (auto & deployNode : deployNodes) {
+ Path deployNodePath = hostPath / deployNode;
+ std::string applicationId(readApplicationId(*_zk, deployNodePath));
+ size_t numParts = std::count(applicationId.begin(), applicationId.end(), ':');
+ // If we have an id with 3 colons, it is a legacy id and can be deleted.
+ if (numParts == 3) {
+ _zk->remove(deployNodePath);
+ }
+ }
+}
+
+void
+ZKFileDBModel::removeDeployFileNodes(const Path & hostPath, const std::string& appId) {
+ std::vector<std::string> deployNodes = _zk->getChildren(hostPath);
+ for (auto & deployNode : deployNodes) {
+ Path deployNodePath = hostPath / deployNode;
+ std::string applicationId(readApplicationId(*_zk, deployNodePath));
+ if (appId == applicationId) {
+ _zk->remove(deployNodePath);
+ }
+ }
+}
+
+bool
+ZKFileDBModel::canRemoveHost(const Path & hostPath, const std::string& appId) {
+ std::vector<std::string> deployNodes = _zk->getChildren(hostPath);
+ for (auto & deployNode : deployNodes) {
+ Path deployNodePath = hostPath / deployNode;
+ std::string applicationId(readApplicationId(*_zk, deployNodePath));
+ if (appId != applicationId) {
+ return false;
+ }
+ }
+ return true;
+}
+
+std::vector<std::string>
+ZKFileDBModel::getHosts() {
+ try {
+ return _zk->getChildren(_hostsPath);
+ } catch(ZKNodeDoesNotExistsException&) {
+ LOGFWD(debug, "No files to be distributed.");
+ return std::vector<std::string>();
+ }
+}
+
+namespace {
+const ZKFileDBModel::Progress::value_type notStarted = 101;
+};
+
+//TODO: Refactor
+ZKFileDBModel::HostStatus
+ZKFileDBModel::getHostStatus(const std::string& hostName) {
+ typedef std::vector<std::string> PeerEntries;
+
+ DeployedFilesToDownload d(_zk.get());
+ DeployedFilesToDownload::FileReferences filesToDownload = d.getLatestDeployedFilesToDownload(hostName);
+
+ HostStatus hostStatus;
+ hostStatus._state = HostStatus::notStarted;
+ hostStatus._numFilesToDownload = filesToDownload.size();
+ hostStatus._numFilesFinished = 0;
+
+ BOOST_FOREACH(std::string file, filesToDownload) {
+ Path path = getPeersPath(file);
+
+ const PeerEntries peerEntries = getSortedChildren(*_zk, path);
+ PeerEntries::const_iterator candidate =
+ std::lower_bound(peerEntries.begin(), peerEntries.end(), hostName);
+
+ if (candidate != peerEntries.end() && isEntryForHost(hostName, *candidate)) {
+ char fileProgressPercentage = getProgress(path / (*candidate));
+ if (fileProgressPercentage == 100) {
+ hostStatus._numFilesFinished++;
+ } else if (fileProgressPercentage != notStarted) {
+ hostStatus._state = HostStatus::inProgress;
+ }
+
+ candidate++;
+ if (candidate != peerEntries.end() && isEntryForHost(hostName, *candidate))
+ BOOST_THROW_EXCEPTION(InvalidHostStatusException());
+ }
+ }
+
+
+ if (hostStatus._numFilesToDownload == hostStatus._numFilesFinished) {
+ hostStatus._state = HostStatus::finished;
+ }
+
+ return hostStatus;
+}
+
+void
+ZKFileDBModel::cleanFiles(
+ const std::vector<std::string>& filesToPreserve) {
+ _zk->retainOnly(_fileDBPath, filesToPreserve);
+}
+
+ZKFileDBModel::ZKFileDBModel(const boost::shared_ptr<ZKFacade>& zk)
+ : _zk(zk)
+{
+ createNode(_root, *_zk);
+ createNode(_fileDBPath, *_zk);
+ createNode(_hostsPath, *_zk);
+}
+
+
+char
+ZKFileDBModel::getProgress(const Path& path) {
+ try {
+ Buffer buffer(_zk->getData(path));
+ if (buffer.size() == 1)
+ return buffer[0];
+ else if (buffer.size() == 0)
+ return 0;
+ else {
+ throw boost::enable_current_exception(InvalidProgressException())
+ <<errorinfo::Path(path);
+ }
+ } catch (ZKNodeDoesNotExistsException& e) {
+ //progress information deleted
+ return notStarted;
+ }
+}
+
+ZKFileDBModel::Progress
+ZKFileDBModel::getProgress(const std::string& fileReference,
+ const std::vector<std::string>& hostsSortedAscending) {
+ Path path = getPeersPath(fileReference);
+
+ Progress progress;
+ progress.reserve(hostsSortedAscending.size());
+
+ typedef std::vector<std::string> PeerEntries;
+ const PeerEntries peerEntries = getSortedChildren(*_zk, path);
+
+ PeerEntries::const_iterator current = peerEntries.begin();
+ BOOST_FOREACH(const std::string& host, hostsSortedAscending) {
+ PeerEntries::const_iterator candidate =
+ std::lower_bound(current, peerEntries.end(), host);
+
+ ZKFileDBModel::Progress::value_type hostProgress = notStarted;
+ if (candidate != peerEntries.end()) {
+ current = candidate;
+ if (isEntryForHost(host, *current))
+ hostProgress = getProgress(path / (*candidate));
+ }
+ progress.push_back(hostProgress);
+ }
+ return progress;
+}
+
+filedistribution::FileDBModel::~FileDBModel() {}
diff --git a/filedistribution/src/vespa/filedistribution/model/zkfiledbmodel.h b/filedistribution/src/vespa/filedistribution/model/zkfiledbmodel.h
new file mode 100644
index 00000000000..cf180f4c780
--- /dev/null
+++ b/filedistribution/src/vespa/filedistribution/model/zkfiledbmodel.h
@@ -0,0 +1,59 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <boost/shared_ptr.hpp>
+
+#include "filedistributionmodel.h"
+#include "zkfacade.h"
+
+namespace filedistribution {
+
+class ZKFileDBModel : public FileDBModel {
+public:
+ typedef boost::filesystem::path Path;
+private:
+ const boost::shared_ptr<ZKFacade> _zk;
+ char getProgress(const Path& path);
+ void removeDeployFileNodes(const Path& hostPath, const std::string& appId);
+ void removeLegacyDeployFileNodes(const Path& hostPath);
+ bool canRemoveHost(const Path& hostPath, const std::string& appId);
+public:
+ const static Path _root;
+ const static Path _fileDBPath;
+ const static Path _hostsPath;
+
+ const static char _peerEntrySeparator = ':';
+
+ Path getPeersPath(const std::string& fileReference) {
+ return _fileDBPath / fileReference;
+ }
+
+ //overrides
+ bool hasFile(const std::string& fileReference);
+ void addFile(const std::string& fileReference, const Buffer& buffer);
+ Move<Buffer> getFile(const std::string& fileReference);
+ void cleanFiles(const std::vector<std::string>& filesToPreserve);
+
+ void setDeployedFilesToDownload(const std::string& hostName,
+ const std::string & appId,
+ const std::vector<std::string> & files);
+ void cleanDeployedFilesToDownload(
+ const std::vector<std::string> & hostsToPreserve,
+ const std::string& appId);
+ void removeDeploymentsThatHaveDifferentApplicationId(
+ const std::vector<std::string> & hostsToPreserve,
+ const std::string& appId);
+ void removeNonApplicationFiles(
+ const Path & hostPath,
+ const std::string& appId);
+ std::vector<std::string> getHosts();
+ HostStatus getHostStatus(const std::string& hostName);
+
+ ZKFileDBModel(const boost::shared_ptr<ZKFacade>& zk);
+
+ Progress getProgress(const std::string& fileReference,
+ const std::vector<std::string>& hostsSortedAscending);
+};
+
+} //namespace filedistribution
+
diff --git a/filedistribution/src/vespa/filedistribution/rpc/CMakeLists.txt b/filedistribution/src/vespa/filedistribution/rpc/CMakeLists.txt
new file mode 100644
index 00000000000..d3691ef4d84
--- /dev/null
+++ b/filedistribution/src/vespa/filedistribution/rpc/CMakeLists.txt
@@ -0,0 +1,6 @@
+# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_library(filedistribution_filedistributorrpc STATIC
+ SOURCES
+ filedistributorrpc.cpp
+ DEPENDS
+)
diff --git a/filedistribution/src/vespa/filedistribution/rpc/filedistributorrpc.cpp b/filedistribution/src/vespa/filedistribution/rpc/filedistributorrpc.cpp
new file mode 100644
index 00000000000..e58165f1f0f
--- /dev/null
+++ b/filedistribution/src/vespa/filedistribution/rpc/filedistributorrpc.cpp
@@ -0,0 +1,279 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/fastos/fastos.h>
+#include "filedistributorrpc.h"
+
+#include <boost/noncopyable.hpp>
+#include <boost/optional.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/thread/locks.hpp>
+#include <boost/lambda/bind.hpp>
+#include <boost/foreach.hpp>
+#include <boost/exception/diagnostic_information.hpp>
+
+#include <vespa/log/log.h>
+LOG_SETUP(".filedistributorrpc");
+
+#include <vespa/fnet/frt/frt.h>
+#include <vespa/frtstream/frtserverstream.h>
+#include <map>
+
+#include "fileprovider.h"
+#include <vespa/filedistribution/model/filedbmodel.h>
+
+using filedistribution::FileDistributorRPC;
+namespace ll = boost::lambda;
+
+namespace {
+typedef boost::lock_guard<boost::mutex> LockGuard;
+
+struct RPCErrorCodes {
+ const static uint32_t baseErrorCode = 0x10000;
+ const static uint32_t baseFileProviderErrorCode = baseErrorCode + 0x1000;
+
+ const static uint32_t unknownError = baseErrorCode + 1;
+};
+
+class QueuedRequests {
+ bool _shuttingDown;
+
+ boost::mutex _mutex;
+ typedef std::multimap<std::string, FRT_RPCRequest*> Map;
+ Map _queuedRequests;
+
+ template <class FUNC>
+ void returnAnswer(const std::string& fileReference, FUNC func) {
+ LockGuard guard(_mutex);
+
+ typedef Map::iterator iterator;
+ std::pair<iterator, iterator> range = _queuedRequests.equal_range(fileReference);
+
+ BOOST_FOREACH( const Map::value_type& request, range) {
+ LOG(info, "Returning earlier enqueued request for file reference '%s'.", request.first.c_str());
+ func(*request.second);
+ request.second->Return();
+ }
+
+ _queuedRequests.erase(range.first, range.second);
+ }
+
+ struct DownloadFinished {
+ const std::string& _path;
+
+ void operator()(FRT_RPCRequest& request) {
+ LOG(info, "Download finished: '%s'", _path.c_str());
+ frtstream::FrtServerStream requestHandler(&request);
+ requestHandler <<_path;
+ }
+
+ DownloadFinished(const std::string& path)
+ :_path(path)
+ {}
+ };
+
+ struct DownloadFailed {
+ filedistribution::FileProvider::FailedDownloadReason _reason;
+
+ void operator()(FRT_RPCRequest& request) {
+ LOG(info, "Download failed: '%d'", _reason);
+ request.SetError(RPCErrorCodes::baseFileProviderErrorCode + _reason, "Download failed");
+ }
+
+ DownloadFailed(filedistribution::FileProvider::FailedDownloadReason reason)
+ :_reason(reason)
+ {}
+ };
+
+public:
+ QueuedRequests()
+ :_shuttingDown(false)
+ {}
+
+ void enqueue(const std::string& fileReference, FRT_RPCRequest* request) {
+ LockGuard guard(_mutex);
+
+ if (_shuttingDown) {
+ LOG(info, "Shutdown: Aborting request for file reference '%s'.", fileReference.c_str());
+ abort(request);
+ } else {
+ _queuedRequests.insert(std::make_pair(fileReference, request));
+ }
+ }
+
+ void abort(FRT_RPCRequest* request) {
+ request->SetError(FRTE_RPC_ABORT);
+ request->Return();
+ }
+
+ void dequeue(const std::string& fileReference, FRT_RPCRequest* request) {
+ LockGuard guard(_mutex);
+
+ typedef Map::iterator iterator;
+ std::pair<iterator, iterator> range = _queuedRequests.equal_range(fileReference);
+
+ iterator candidate = std::find(range.first, range.second,
+ std::pair<const std::string, FRT_RPCRequest*>(fileReference, request));
+
+ if (candidate != range.second)
+ _queuedRequests.erase(candidate);
+ }
+
+ void downloadFinished(const std::string& fileReference,
+ const boost::filesystem::path& path) {
+
+ DownloadFinished handler(path.string());
+ returnAnswer(fileReference, handler);
+ }
+
+ void downloadFailed(const std::string& fileReference,
+ filedistribution::FileProvider::FailedDownloadReason reason) {
+
+ DownloadFailed handler(reason);
+ returnAnswer(fileReference, handler);
+ }
+
+ void shutdown() {
+ LockGuard guard(_mutex);
+ _shuttingDown = true;
+
+ BOOST_FOREACH( const Map::value_type& request, _queuedRequests) {
+ LOG(info, "Shutdown: Aborting earlier enqueued request for file reference '%s'.", request.first.c_str());
+ abort(request.second);
+ }
+ _queuedRequests.erase(_queuedRequests.begin(), _queuedRequests.end());
+ }
+};
+
+} //anonymous namespace
+
+class FileDistributorRPC::Server : public FRT_Invokable, boost::noncopyable {
+ public:
+ boost::shared_ptr<FileProvider> _fileProvider;
+ std::unique_ptr<FRT_Supervisor> _supervisor;
+
+ QueuedRequests _queuedRequests;
+
+ boost::signals2::scoped_connection _downloadCompletedConnection;
+ boost::signals2::scoped_connection _downloadFailedConnection;
+
+ void queueRequest(const std::string& fileReference, FRT_RPCRequest* request);
+ void defineMethods();
+
+ Server(int listen_port,
+ const boost::shared_ptr<FileProvider>& provider);
+ void start(const boost::shared_ptr<FileDistributorRPC> parent);
+ ~Server();
+
+ void waitFor(FRT_RPCRequest*);
+};
+
+FileDistributorRPC::
+Server::Server(int listen_port,
+ const boost::shared_ptr<filedistribution::FileProvider>& provider)
+ :_fileProvider(provider),
+ _supervisor(new FRT_Supervisor())
+{
+ defineMethods();
+ _supervisor->Listen(listen_port);
+ _supervisor->Start();
+}
+
+
+FileDistributorRPC::
+Server::~Server() {
+ _queuedRequests.shutdown();
+
+ const bool waitForFinished = true;
+ _supervisor->ShutDown(waitForFinished);
+}
+
+void
+FileDistributorRPC::Server::start(const boost::shared_ptr<FileDistributorRPC> parent) {
+ _downloadCompletedConnection =
+ _fileProvider->downloadCompleted().connect(FileProvider::DownloadCompletedSignal::slot_type(
+ ll::bind(&QueuedRequests::downloadFinished, &_queuedRequests, ll::_1, ll::_2)).
+ track(parent));
+
+ _downloadFailedConnection =
+ _fileProvider->downloadFailed().connect(FileProvider::DownloadFailedSignal::slot_type(
+ ll::bind(&QueuedRequests::downloadFailed, &_queuedRequests, ll::_1, ll::_2)).
+ track(parent));
+
+
+}
+
+void
+FileDistributorRPC::
+Server::queueRequest(const std::string& fileReference, FRT_RPCRequest* request) {
+ _queuedRequests.enqueue( fileReference, request );
+ try {
+ _fileProvider->downloadFile(fileReference);
+ } catch(...) {
+ _queuedRequests.dequeue(fileReference, request);
+ throw;
+ }
+}
+
+void
+FileDistributorRPC::
+Server::defineMethods() {
+ const bool instant = true;
+ FRT_ReflectionBuilder builder(_supervisor.get());
+ builder.DefineMethod("waitFor", "s", "s", instant,
+ FRT_METHOD(Server::waitFor), this);
+}
+
+void
+FileDistributorRPC::
+Server::waitFor(FRT_RPCRequest* request) {
+ try {
+ frtstream::FrtServerStream requestHandler(request);
+ std::string fileReference;
+ requestHandler >> fileReference;
+ boost::optional<boost::filesystem::path> path
+ = _fileProvider->getPath(fileReference);
+ if (path) {
+ LOG(debug, "Returning request for file reference '%s'.", fileReference.c_str());
+ requestHandler << path->string();
+ } else {
+ LOG(debug, "Enqueuing file request for file reference '%s'.", fileReference.c_str());
+ request->Detach();
+ queueRequest(fileReference, request);
+ }
+ } catch (const FileDoesNotExistException&) {
+ LOG(warning, "Recieved a request for a file reference that does not exist in zookeeper.");
+ request->SetError(RPCErrorCodes::baseFileProviderErrorCode + FileProvider::FileReferenceDoesNotExist,
+ "No such file reference");
+ request->Return();
+ } catch (const std::exception& e) {
+ LOG(error, "An exception occurred while calling the rpc method waitFor:%s",
+ boost::diagnostic_information(e).c_str());
+ request->SetError(RPCErrorCodes::unknownError, boost::diagnostic_information(e).c_str());
+ request->Return(); //the request might be detached.
+ }
+}
+
+FileDistributorRPC::FileDistributorRPC(const std::string& connectionSpec,
+ const boost::shared_ptr<filedistribution::FileProvider>& provider)
+ :_server(new Server(get_port(connectionSpec), provider))
+{}
+
+void
+FileDistributorRPC::start()
+{
+ _server->start(shared_from_this());
+}
+
+int
+FileDistributorRPC::get_port(const std::string &spec)
+{
+ const char *port = (spec.data() + spec.size());
+ while ((port > spec.data()) && (port[-1] >= '0') && (port[-1] <= '9')) {
+ --port;
+ }
+ return atoi(port);
+}
+
+filedistribution::FileDistributorRPC::~FileDistributorRPC()
+{
+ LOG(debug, "Deconstructing FileDistributorRPC");
+}
diff --git a/filedistribution/src/vespa/filedistribution/rpc/filedistributorrpc.h b/filedistribution/src/vespa/filedistribution/rpc/filedistributorrpc.h
new file mode 100644
index 00000000000..e527fc2aeca
--- /dev/null
+++ b/filedistribution/src/vespa/filedistribution/rpc/filedistributorrpc.h
@@ -0,0 +1,30 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <memory>
+#include <boost/enable_shared_from_this.hpp>
+#include <boost/noncopyable.hpp>
+
+#include "fileprovider.h"
+
+namespace filedistribution {
+
+class FileDistributorRPC : boost::noncopyable,
+ public boost::enable_shared_from_this<FileDistributorRPC>
+{
+ class Server;
+public:
+ FileDistributorRPC(const std::string& connectSpec,
+ const boost::shared_ptr<FileProvider>& provider);
+
+ void start();
+
+ static int get_port(const std::string &spec);
+
+ ~FileDistributorRPC();
+private:
+ std::unique_ptr<Server> _server;
+};
+
+} //namespace filedistribution
+
diff --git a/filedistribution/src/vespa/filedistribution/rpc/fileprovider.h b/filedistribution/src/vespa/filedistribution/rpc/fileprovider.h
new file mode 100644
index 00000000000..a95b50fc0f2
--- /dev/null
+++ b/filedistribution/src/vespa/filedistribution/rpc/fileprovider.h
@@ -0,0 +1,39 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include<boost/optional.hpp>
+#include<boost/filesystem/path.hpp>
+#include<boost/signals2.hpp>
+
+namespace filedistribution {
+
+class FileProvider
+{
+public:
+ typedef boost::signals2::signal<void (const std::string& /* fileReference */,
+ const boost::filesystem::path&)>
+ DownloadCompletedSignal;
+ typedef DownloadCompletedSignal::slot_type DownloadCompletedHandler;
+
+ enum FailedDownloadReason {
+ FileReferenceDoesNotExist,
+ FileReferenceRemoved
+ };
+
+ typedef boost::signals2::signal<void (const std::string& /* fileReference */,
+ FailedDownloadReason)>
+ DownloadFailedSignal;
+ typedef DownloadFailedSignal::slot_type DownloadFailedHandler;
+
+ virtual boost::optional<boost::filesystem::path> getPath(const std::string& fileReference) = 0;
+ virtual void downloadFile(const std::string& fileReference) = 0;
+
+ virtual ~FileProvider() {}
+
+ //Signals
+ virtual DownloadCompletedSignal& downloadCompleted() = 0;
+ virtual DownloadFailedSignal& downloadFailed() = 0;
+};
+
+} //namespace filedistribution
+
diff --git a/filedistribution/src/vespa/filedistribution/rpcconfig/CMakeLists.txt b/filedistribution/src/vespa/filedistribution/rpcconfig/CMakeLists.txt
new file mode 100644
index 00000000000..e3b90c464c3
--- /dev/null
+++ b/filedistribution/src/vespa/filedistribution/rpcconfig/CMakeLists.txt
@@ -0,0 +1,7 @@
+# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_library(filedistribution_filedistributorrpcconfig STATIC
+ SOURCES
+ DEPENDS
+)
+vespa_generate_config(filedistribution_filedistributorrpcconfig filedistributorrpc.def)
+install(FILES filedistributorrpc.def DESTINATION var/db/vespa/config_server/serverdb/classes)
diff --git a/filedistribution/src/vespa/filedistribution/rpcconfig/filedistributorrpc.def b/filedistribution/src/vespa/filedistribution/rpcconfig/filedistributorrpc.def
new file mode 100644
index 00000000000..ceadd8ae8c8
--- /dev/null
+++ b/filedistribution/src/vespa/filedistribution/rpcconfig/filedistributorrpc.def
@@ -0,0 +1,3 @@
+# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+namespace=cloud.config.filedistribution
+connectionspec string