summaryrefslogtreecommitdiffstats
path: root/config-proxy
diff options
context:
space:
mode:
authorHarald Musum <musum@oath.com>2017-11-02 14:46:55 +0100
committerHarald Musum <musum@oath.com>2017-11-02 14:46:55 +0100
commit0ade8429cc03ee3bee7800f6b709374c1be0929c (patch)
tree32277bb9005f7d521993c24b42e071bd9353f44d /config-proxy
parent648a78cd05b269b74cbb819e8d8ed9c7110e8b50 (diff)
Add file distribution skeleton to config proxy
Diffstat (limited to 'config-proxy')
-rw-r--r--config-proxy/pom.xml4
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java202
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java23
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloader.java98
-rw-r--r--config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockConfigSource.java3
-rw-r--r--config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java79
6 files changed, 335 insertions, 74 deletions
diff --git a/config-proxy/pom.xml b/config-proxy/pom.xml
index aa9783f60a3..8a7b6f253e2 100644
--- a/config-proxy/pom.xml
+++ b/config-proxy/pom.xml
@@ -57,6 +57,10 @@
<artifactId>hamcrest-core</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java
index f92e0e04974..1fced0b1e3d 100644
--- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java
@@ -1,6 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.config.proxy;
+import com.yahoo.config.FileReference;
import com.yahoo.jrt.*;
import com.yahoo.log.LogLevel;
import com.yahoo.vespa.config.*;
@@ -9,19 +10,20 @@ import com.yahoo.vespa.config.protocol.JRTConfigRequestFactory;
import com.yahoo.vespa.config.protocol.JRTServerConfigRequest;
import com.yahoo.vespa.config.protocol.JRTServerConfigRequestV3;
+import java.io.File;
import java.lang.*;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.Set;
+import java.util.*;
import java.util.logging.Logger;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
/**
- * A proxy server that handles RPC config requests.
+ * An RPC server that handles config and file distribution requests.
*
* @author hmusum
- * @since 5.1
*/
+// TODO: Rename now that it also support file distribution request
public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer {
private final static Logger log = Logger.getLogger(ConfigProxyRpcServer.class.getName());
@@ -34,7 +36,8 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer
ConfigProxyRpcServer(ProxyServer proxyServer, Spec spec) {
this.proxyServer = proxyServer;
this.spec = spec;
- setUp();
+ declareConfigMethods();
+ declareFileDistributionMethods();
}
public void run() {
@@ -57,8 +60,7 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer
return spec;
}
- private void setUp() {
- // The getConfig method in this class will handle RPC calls for getting config
+ private void declareConfigMethods() {
supervisor.addMethod(JRTMethods.createConfigV3GetConfigMethod(this, "getConfigV3"));
supervisor.addMethod(new Method("ping", "", "i",
this, "ping")
@@ -103,6 +105,32 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer
.returnDesc(0, "ret", "Empty string or error message"));
}
+ private void declareFileDistributionMethods() {
+ // Legacy method, needs to be the same name as used in filedistributor
+ supervisor.addMethod(new Method("waitFor", "s", "s",
+ this, "getFile")
+ .methodDesc("wait for file reference")
+ .paramDesc(0, "file reference", "file reference")
+ .returnDesc(0, "path", "path to file"));
+ supervisor.addMethod(new Method("filedistribution.getFile", "s", "s",
+ this, "getFile")
+ .methodDesc("wait for file reference")
+ .paramDesc(0, "file reference", "file reference")
+ .returnDesc(0, "path", "path to file"));
+ supervisor.addMethod(new Method("filedistribution.getActiveFileReferencesStatus", "", "SD",
+ this, "getActiveFileReferencesStatus")
+ .methodDesc("download status for file references")
+ .returnDesc(0, "file references", "array of file references")
+ .returnDesc(1, "download status", "percentage downloaded of each file reference in above array"));
+ supervisor.addMethod(new Method("filedistribution.setFileReferencesToDownload", "S", "i",
+ this, "setFileReferencesToDownload")
+ .methodDesc("set which file references to download")
+ .paramDesc(0, "file references", "file reference to download")
+ .returnDesc(0, "ret", "0 if success, 1 otherwise"));
+ }
+
+ //---------------- RPC methods ------------------------------------
+
/**
* Handles RPC method "config.v3.getConfig" requests.
*
@@ -118,54 +146,6 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer
}
}
- private boolean isProtocolVersionSupported(JRTServerConfigRequest request) {
- Set<Long> supportedProtocolVersions = JRTConfigRequestFactory.supportedProtocolVersions();
- if (supportedProtocolVersions.contains(request.getProtocolVersion())) {
- return true;
- } else {
- String message = "Illegal protocol version " + request.getProtocolVersion() +
- " in request " + request.getShortDescription() + ", only protocol versions " + supportedProtocolVersions + " are supported";
- log.log(LogLevel.ERROR, message);
- request.addErrorResponse(ErrorCode.ILLEGAL_PROTOCOL_VERSION, message);
- }
- return false;
- }
-
- private void preHandle(Request req) {
- proxyServer.getStatistics().incRpcRequests();
- req.detach();
- req.target().addWatcher(this);
- }
-
- /**
- * Handles all versions of "getConfig" requests.
- *
- * @param request a Request
- */
- private void getConfigImpl(JRTServerConfigRequest request) {
- request.getRequestTrace().trace(TRACELEVEL, "Config proxy getConfig()");
- log.log(LogLevel.DEBUG, () ->"getConfig: " + request.getShortDescription() + ",configmd5=" + request.getRequestConfigMd5());
- if (!request.validateParameters()) {
- // Error code is set in verifyParameters if parameters are not OK.
- log.log(LogLevel.WARNING, "Parameters for request " + request + " did not validate: " + request.errorCode() + " : " + request.errorMessage());
- returnErrorResponse(request, request.errorCode(), "Parameters for request " + request.getShortDescription() + " did not validate: " + request.errorMessage());
- return;
- }
- try {
- RawConfig config = proxyServer.resolveConfig(request);
- if (config == null) {
- log.log(LogLevel.SPAM, () -> "No config received yet for " + request.getShortDescription() + ", not sending response");
- } else if (ProxyServer.configOrGenerationHasChanged(config, request)) {
- returnOkResponse(request, config);
- } else {
- log.log(LogLevel.SPAM, "No new config for " + request.getShortDescription() + ", not sending response");
- }
- } catch (Exception e) {
- e.printStackTrace();
- returnErrorResponse(request, com.yahoo.vespa.config.ErrorCode.INTERNAL_ERROR, e.getMessage());
- }
- }
-
/**
* Returns 0 if server is alive.
*
@@ -207,14 +187,6 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer
req.returnValues().add(new StringArray(ret));
}
- private String printSourceConnections() {
- StringBuilder sb = new StringBuilder();
- for (String s : proxyServer.getSourceConnections()) {
- sb.append(s).append("\n");
- }
- return sb.toString();
- }
-
@SuppressWarnings({"UnusedDeclaration"})
public final void updateSources(Request req) {
String sources = req.parameters().get(0).asString();
@@ -263,6 +235,108 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer
req.returnValues().add(new StringValue(memoryCache.dumpCacheToDisk(req.parameters().get(0).asString(), memoryCache)));
}
+ @SuppressWarnings({"UnusedDeclaration"})
+ public final void getFile(Request req) {
+ // TODO: Detach to avoid holding transport thread
+ FileReference fileReference = new FileReference(req.parameters().get(0).asString());
+ String pathToFile = proxyServer.fileDownloader()
+ .getFile(fileReference)
+ .orElseGet(() -> new File(""))
+ .getAbsolutePath();
+
+ log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' available at " + pathToFile);
+ req.returnValues().add(new StringValue(pathToFile));
+ }
+
+ @SuppressWarnings({"UnusedDeclaration"})
+ public final void getActiveFileReferencesStatus(Request req) {
+ Map<FileReference, Double> downloadStatus = proxyServer.fileDownloader().downloadStatus();
+
+ String[] fileRefArray = new String[downloadStatus.keySet().size()];
+ fileRefArray = downloadStatus.keySet().stream()
+ .map(FileReference::value)
+ .collect(Collectors.toList())
+ .toArray(fileRefArray);
+
+ double[] downloadStatusArray = new double[downloadStatus.values().size()];
+ int i = 0;
+ for (Double d : downloadStatus.values()) {
+ downloadStatusArray[i++] = d;
+ }
+
+ req.returnValues().add(new StringArray(fileRefArray));
+ req.returnValues().add(new DoubleArray(downloadStatusArray));
+ }
+
+ @SuppressWarnings({"UnusedDeclaration"})
+ public final void setFileReferencesToDownload(Request req) {
+ String[] fileReferenceStrings = req.parameters().get(0).asStringArray();
+ List<FileReference> fileReferences = Stream.of(fileReferenceStrings)
+ .map(FileReference::new)
+ .collect(Collectors.toList());
+ proxyServer.fileDownloader().queueForDownload(fileReferences);
+
+ req.returnValues().add(new Int32Value(0));
+ }
+
+ //----------------------------------------------------
+
+ private boolean isProtocolVersionSupported(JRTServerConfigRequest request) {
+ Set<Long> supportedProtocolVersions = JRTConfigRequestFactory.supportedProtocolVersions();
+ if (supportedProtocolVersions.contains(request.getProtocolVersion())) {
+ return true;
+ } else {
+ String message = "Illegal protocol version " + request.getProtocolVersion() +
+ " in request " + request.getShortDescription() + ", only protocol versions " + supportedProtocolVersions + " are supported";
+ log.log(LogLevel.ERROR, message);
+ request.addErrorResponse(ErrorCode.ILLEGAL_PROTOCOL_VERSION, message);
+ }
+ return false;
+ }
+
+ private void preHandle(Request req) {
+ proxyServer.getStatistics().incRpcRequests();
+ req.detach();
+ req.target().addWatcher(this);
+ }
+
+ /**
+ * Handles all versions of "getConfig" requests.
+ *
+ * @param request a Request
+ */
+ private void getConfigImpl(JRTServerConfigRequest request) {
+ request.getRequestTrace().trace(TRACELEVEL, "Config proxy getConfig()");
+ log.log(LogLevel.DEBUG, () ->"getConfig: " + request.getShortDescription() + ",configmd5=" + request.getRequestConfigMd5());
+ if (!request.validateParameters()) {
+ // Error code is set in verifyParameters if parameters are not OK.
+ log.log(LogLevel.WARNING, "Parameters for request " + request + " did not validate: " + request.errorCode() + " : " + request.errorMessage());
+ returnErrorResponse(request, request.errorCode(), "Parameters for request " + request.getShortDescription() + " did not validate: " + request.errorMessage());
+ return;
+ }
+ try {
+ RawConfig config = proxyServer.resolveConfig(request);
+ if (config == null) {
+ log.log(LogLevel.SPAM, () -> "No config received yet for " + request.getShortDescription() + ", not sending response");
+ } else if (ProxyServer.configOrGenerationHasChanged(config, request)) {
+ returnOkResponse(request, config);
+ } else {
+ log.log(LogLevel.SPAM, "No new config for " + request.getShortDescription() + ", not sending response");
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ returnErrorResponse(request, com.yahoo.vespa.config.ErrorCode.INTERNAL_ERROR, e.getMessage());
+ }
+ }
+
+ private String printSourceConnections() {
+ StringBuilder sb = new StringBuilder();
+ for (String s : proxyServer.getSourceConnections()) {
+ sb.append(s).append("\n");
+ }
+ return sb.toString();
+ }
+
final void listCachedConfig(Request req, boolean full) {
String[] ret;
MemoryCache cache = proxyServer.getMemoryCache();
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java
index 74f32f6d052..4ee77beb2d7 100644
--- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java
@@ -2,7 +2,6 @@
package com.yahoo.vespa.config.proxy;
import com.yahoo.concurrent.DaemonThreadFactory;
-import com.yahoo.config.subscription.ConfigSource;
import com.yahoo.config.subscription.ConfigSourceSet;
import com.yahoo.jrt.Spec;
@@ -12,6 +11,7 @@ import com.yahoo.log.event.Event;
import com.yahoo.system.CatchSigTerm;
import com.yahoo.vespa.config.*;
import com.yahoo.vespa.config.protocol.JRTServerConfigRequest;
+import com.yahoo.vespa.config.proxy.filedistribution.FileDownloader;
import java.util.List;
import java.util.concurrent.Executors;
@@ -45,7 +45,7 @@ public class ProxyServer implements Runnable {
private final ConfigProxyRpcServer rpcServer;
final DelayedResponses delayedResponses;
- private ConfigSource configSource;
+ private ConfigSourceSet configSource;
private volatile ConfigSourceClient configClient;
@@ -55,6 +55,7 @@ public class ProxyServer implements Runnable {
private static final double timingValuesRatio = 0.8;
private final static TimingValues defaultTimingValues;
private final boolean delayedResponseHandling;
+ private final FileDownloader fileDownloader;
private volatile Mode mode = new Mode(DEFAULT);
@@ -68,7 +69,7 @@ public class ProxyServer implements Runnable {
defaultTimingValues = tv;
}
- private ProxyServer(Spec spec, DelayedResponses delayedResponses, ConfigSource source,
+ private ProxyServer(Spec spec, DelayedResponses delayedResponses, ConfigSourceSet source,
ConfigProxyStatistics statistics, TimingValues timingValues,
boolean delayedResponseHandling, MemoryCache memoryCache,
ConfigSourceClient configClient) {
@@ -82,17 +83,17 @@ public class ProxyServer implements Runnable {
this.rpcServer = createRpcServer(spec);
clientUpdater = new ClientUpdater(rpcServer, statistics, delayedResponses);
this.configClient = createClient(clientUpdater, delayedResponses, source, timingValues, memoryCache, configClient);
+ this.fileDownloader = new FileDownloader(source);
}
static ProxyServer createTestServer(ConfigSourceSet source) {
return createTestServer(source, null, new MemoryCache(), new ConfigProxyStatistics());
}
- static ProxyServer createTestServer(ConfigSource source,
+ static ProxyServer createTestServer(ConfigSourceSet source,
ConfigSourceClient configSourceClient,
MemoryCache memoryCache,
- ConfigProxyStatistics statistics)
- {
+ ConfigProxyStatistics statistics) {
final boolean delayedResponseHandling = false;
return new ProxyServer(null, new DelayedResponses(statistics),
source, statistics, defaultTimingValues(), delayedResponseHandling,
@@ -153,10 +154,10 @@ public class ProxyServer implements Runnable {
}
private ConfigSourceClient createClient(ClientUpdater clientUpdater, DelayedResponses delayedResponses,
- Object source, TimingValues timingValues,
+ ConfigSourceSet source, TimingValues timingValues,
MemoryCache memoryCache, ConfigSourceClient client) {
return (client == null)
- ? new RpcConfigSourceClient((ConfigSourceSet) source, clientUpdater, memoryCache, timingValues, delayedResponses)
+ ? new RpcConfigSourceClient(source, clientUpdater, memoryCache, timingValues, delayedResponses)
: client;
}
@@ -165,7 +166,7 @@ public class ProxyServer implements Runnable {
}
private RpcConfigSourceClient createRpcClient() {
- return new RpcConfigSourceClient((ConfigSourceSet) configSource, clientUpdater, memoryCache, timingValues, delayedResponses);
+ return new RpcConfigSourceClient(configSource, clientUpdater, memoryCache, timingValues, delayedResponses);
}
private void setupSigTermHandler() {
@@ -279,4 +280,8 @@ public class ProxyServer implements Runnable {
flush();
configClient = createRpcClient();
}
+
+ FileDownloader fileDownloader() {
+ return fileDownloader;
+ }
}
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloader.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloader.java
new file mode 100644
index 00000000000..9074527e4e4
--- /dev/null
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloader.java
@@ -0,0 +1,98 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.config.proxy.filedistribution;
+
+import com.google.common.collect.ImmutableSet;
+import com.yahoo.config.FileReference;
+import com.yahoo.config.subscription.ConfigSourceSet;
+import com.yahoo.log.LogLevel;
+import com.yahoo.vespa.defaults.Defaults;
+
+import java.io.File;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.*;
+import java.util.logging.Logger;
+
+/**
+ * Keeps track of files to download and download status
+ *
+ * @author hmusum
+ */
+public class FileDownloader {
+ private final static Logger log = Logger.getLogger(FileDownloader.class.getName());
+
+
+ private final String filesDirectory;
+ private final ConfigSourceSet configSourceSet;
+ private final Duration timeout;
+ private final Map<FileReference, Double> downloadStatus = new HashMap<>();
+ private final Set<FileReference> queuedForDownload = new LinkedHashSet<>();
+
+ public FileDownloader(ConfigSourceSet configSourceSet) {
+ this(configSourceSet,
+ Defaults.getDefaults().underVespaHome("var/db/vespa/filedistribution"),
+ Duration.ofMinutes(15));
+ }
+
+ FileDownloader(ConfigSourceSet configSourceSet, String filesDirectory, Duration timeout) {
+ this.configSourceSet = configSourceSet;
+ this.filesDirectory = filesDirectory;
+ this.timeout = timeout;
+ }
+
+ public Optional<File> getFile(FileReference fileReference) {
+ Objects.requireNonNull(fileReference, "file reference cannot be null");
+ File directory = new File(filesDirectory, fileReference.value()); // directory with one file
+
+ log.log(LogLevel.DEBUG, "Checking if there is a file in '" + directory.getAbsolutePath() + "' ");
+ Instant end = Instant.now().plus(timeout);
+ do {
+ File[] files = directory.listFiles();
+ if (directory.exists() && directory.isDirectory() && files != null && files.length > 0) {
+ if (files.length != 1) {
+ throw new RuntimeException("More than one file in '" + fileReference.value() +
+ "', expected only one, unable to proceed");
+ }
+ File file = files[0];
+ if (!file.exists()) {
+ throw new RuntimeException("File with reference '" + fileReference.value() +
+ "' does not exist");
+ } else if (!file.canRead()) {
+ throw new RuntimeException("File with reference '" + fileReference.value() +
+ "'exists, but unable to read it");
+ } else {
+ downloadStatus.put(fileReference, 100.0);
+ return Optional.of(file);
+ }
+ } else {
+ queueForDownload(fileReference);
+ }
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ } while (Instant.now().isBefore(end));
+
+ return Optional.empty();
+ }
+
+ public Map<FileReference, Double> downloadStatus() {
+ return downloadStatus;
+ }
+
+ public void queueForDownload(List<FileReference> fileReferences) {
+ fileReferences.forEach(this::queueForDownload);
+ }
+
+ private void queueForDownload(FileReference fileReference) {
+ log.log(LogLevel.INFO, "Queued '" + fileReference.value() + "' for download ");
+ queuedForDownload.add(fileReference);
+ downloadStatus.put(fileReference, 0.0);
+ }
+
+ ImmutableSet<FileReference> queuedForDownload() {
+ return ImmutableSet.copyOf(queuedForDownload);
+ }
+
+} \ No newline at end of file
diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockConfigSource.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockConfigSource.java
index 52de4d1942a..dc9a3408510 100644
--- a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockConfigSource.java
+++ b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockConfigSource.java
@@ -2,6 +2,7 @@
package com.yahoo.vespa.config.proxy;
import com.yahoo.config.subscription.ConfigSource;
+import com.yahoo.config.subscription.ConfigSourceSet;
import com.yahoo.vespa.config.ConfigKey;
import com.yahoo.vespa.config.RawConfig;
@@ -14,7 +15,7 @@ import java.util.HashMap;
* @author hmusum
* @since 5.1.10
*/
-class MockConfigSource implements ConfigSource {
+class MockConfigSource extends ConfigSourceSet {
private final HashMap<ConfigKey<?>, RawConfig> backing = new HashMap<>();
private final ClientUpdater clientUpdater;
diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java
new file mode 100644
index 00000000000..ea880e451b6
--- /dev/null
+++ b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java
@@ -0,0 +1,79 @@
+package com.yahoo.vespa.config.proxy.filedistribution;
+
+import com.yahoo.config.FileReference;
+import com.yahoo.config.subscription.ConfigSourceSet;
+import com.yahoo.io.IOUtils;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.time.Duration;
+import java.util.*;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class FileDownloaderTest {
+ private static final ConfigSourceSet configSourceSet = new ConfigSourceSet();
+
+ @Test
+ public void download() throws IOException {
+ File downloadDir = Files.createTempDirectory("filedistribution").toFile();
+ FileDownloader fileDownloader = new FileDownloader(configSourceSet, downloadDir.getAbsolutePath(), Duration.ofMillis(200));
+
+ // Write a file to download directory to simulate download going OK
+ String fileReferenceString = "somehash";
+ String fileName = "foo.jar";
+ File fileReferenceFullPath = fileReferenceFullPath(downloadDir, fileReferenceString);
+ FileReference fileReference = writeFileReference(downloadDir, fileReferenceString, fileName);
+
+ // Check that we get correct path and content when asking for file reference
+ Optional<File> pathToFile = fileDownloader.getFile(fileReference);
+ assertTrue(pathToFile.isPresent());
+ String downloadedFile = new File(fileReferenceFullPath, fileName).getAbsolutePath();
+ assertEquals(new File(fileReferenceFullPath, fileName).getAbsolutePath(), downloadedFile);
+ assertEquals("content", IOUtils.readFile(pathToFile.get()));
+
+ // Verify download status
+ Map<FileReference, Double> downloadStatus = fileDownloader.downloadStatus();
+ assertEquals(1, downloadStatus.size());
+ assertDownloadStatus(Collections.singletonList(fileReference), downloadStatus.entrySet().iterator().next(), 100.0);
+
+ // Non-existing file
+ assertFalse(fileReferenceFullPath.getAbsolutePath(), fileDownloader.getFile(new FileReference("doesnotexist")).isPresent());
+ }
+
+ @Test
+ public void setFilesToDownload() throws IOException {
+ File downloadDir = Files.createTempDirectory("filedistribution").toFile();
+ FileDownloader fileDownloader = new FileDownloader(configSourceSet, downloadDir.getAbsolutePath(), Duration.ofMillis(200));
+ List<FileReference> fileReferences = Arrays.asList(new FileReference("foo"), new FileReference("bar"));
+ fileDownloader.queueForDownload(fileReferences);
+
+ assertEquals(fileReferences, fileDownloader.queuedForDownload().asList());
+
+ // Verify download status
+ Map<FileReference, Double> downloadStatus = fileDownloader.downloadStatus();
+ assertEquals(2, downloadStatus.size());
+
+ assertDownloadStatus(fileReferences, downloadStatus.entrySet().iterator().next(), 0.0);
+ assertDownloadStatus(fileReferences, downloadStatus.entrySet().iterator().next(), 0.0);
+ }
+
+ private FileReference writeFileReference(File dir, String fileReferenceString, String fileName) throws IOException {
+ File file = new File(new File(dir, fileReferenceString), fileName);
+ IOUtils.writeFile(file, "content", false);
+ return new FileReference(fileReferenceString);
+ }
+
+ private File fileReferenceFullPath(File dir, String fileReferenceString) {
+ return new File(dir, fileReferenceString);
+ }
+
+ private void assertDownloadStatus(List<FileReference> fileReferences, Map.Entry<FileReference, Double> entry, double expectedDownloadStatus) {
+ assertTrue(fileReferences.contains(new FileReference(entry.getKey().value())));
+ assertEquals(expectedDownloadStatus, entry.getValue(), 0.0001);
+ }
+}