aboutsummaryrefslogtreecommitdiffstats
path: root/filedistribution
diff options
context:
space:
mode:
authorHarald Musum <musum@yahoo-inc.com>2017-11-09 13:14:45 +0100
committerGitHub <noreply@github.com>2017-11-09 13:14:45 +0100
commit8d42bb22e208a754c4e17ac935edd457c81408c6 (patch)
tree7548a20caac4552becafbceeba5fb144c4a59017 /filedistribution
parent82c47fcb7f049faed6990e79bc149e567579c3ac (diff)
Revert "Move file distribution stuff to filedistribution module"
Diffstat (limited to 'filedistribution')
-rw-r--r--filedistribution/pom.xml59
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloader.java153
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownload.java28
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java184
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/RpcServer.java155
-rw-r--r--filedistribution/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java237
6 files changed, 0 insertions, 816 deletions
diff --git a/filedistribution/pom.xml b/filedistribution/pom.xml
index 2a63c1ba962..0a4b75788bc 100644
--- a/filedistribution/pom.xml
+++ b/filedistribution/pom.xml
@@ -15,54 +15,10 @@
<name>${project.artifactId}</name>
<dependencies>
<dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>com.yahoo.vespa</groupId>
- <artifactId>yolean</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
<groupId>com.yahoo.vespa</groupId>
<artifactId>config-lib</artifactId>
<version>${project.version}</version>
</dependency>
- <dependency>
- <groupId>com.yahoo.vespa</groupId>
- <artifactId>defaults</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>com.yahoo.vespa</groupId>
- <artifactId>config</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>com.yahoo.vespa</groupId>
- <artifactId>jrt</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>com.yahoo.vespa</groupId>
- <artifactId>vespajlib</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>com.yahoo.vespa</groupId>
- <artifactId>vespalog</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.hamcrest</groupId>
- <artifactId>hamcrest-core</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </dependency>
</dependencies>
<build>
<plugins>
@@ -85,21 +41,6 @@
</execution>
</executions>
</plugin>
- <plugin>
- <groupId>com.yahoo.vespa</groupId>
- <artifactId>bundle-plugin</artifactId>
- <extensions>true</extensions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <compilerArgs>
- <arg>-Xlint:all</arg>
- <arg>-Werror</arg>
- </compilerArgs>
- </configuration>
- </plugin>
</plugins>
</build>
</project>
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloader.java
deleted file mode 100644
index f3c694f31ab..00000000000
--- a/filedistribution/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloader.java
+++ /dev/null
@@ -1,153 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.config.proxy.filedistribution;
-
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-import com.yahoo.config.FileReference;
-import com.yahoo.log.LogLevel;
-import com.yahoo.vespa.config.ConnectionPool;
-import com.yahoo.vespa.defaults.Defaults;
-import com.yahoo.yolean.Exceptions;
-
-import java.io.File;
-import java.time.Duration;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.logging.Logger;
-
-/**
- * Handles downloads of files (file references only for now)
- *
- * @author hmusum
- */
-public class FileDownloader {
-
- private final static Logger log = Logger.getLogger(FileDownloader.class.getName());
-
- private final File downloadDirectory;
- private final Duration timeout;
- private final FileReferenceDownloader fileReferenceDownloader;
-
- public FileDownloader(ConnectionPool connectionPool) {
- this(connectionPool,
- new File(Defaults.getDefaults().underVespaHome("var/db/vespa/filedistribution")),
- Duration.ofMinutes(15));
- }
-
- FileDownloader(ConnectionPool connectionPool, File downloadDirectory, Duration timeout) {
- this.downloadDirectory = downloadDirectory;
- this.timeout = timeout;
- this.fileReferenceDownloader = new FileReferenceDownloader(downloadDirectory, connectionPool, timeout);
- }
-
- public Optional<File> getFile(FileReference fileReference) {
- Objects.requireNonNull(fileReference, "file reference cannot be null");
- File directory = new File(downloadDirectory, fileReference.value());
- log.log(LogLevel.DEBUG, "Checking if there is a file in '" + directory.getAbsolutePath() + "' ");
-
- Optional<File> file = getFileFromFileSystem(fileReference, directory);
- if (file.isPresent()) {
- return file;
- } else {
- log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' not found in " +
- directory.getAbsolutePath() + ", starting download");
- return queueForDownload(fileReference, timeout);
- }
- }
-
- public void queueForDownload(List<FileReference> fileReferences) {
- fileReferences.forEach(this::queueForDownload);
- }
-
- public void receiveFile(FileReference fileReference, String filename, byte[] content) {
- fileReferenceDownloader.receiveFile(fileReference, filename, content);
- }
-
- double downloadStatus(FileReference fileReference) {
- return fileReferenceDownloader.downloadStatus(fileReference.value());
- }
-
- public Map<FileReference, Double> downloadStatus() {
- return fileReferenceDownloader.downloadStatus();
- }
-
- File downloadDirectory() {
- return downloadDirectory;
- }
-
- private Optional<File> getFileFromFileSystem(FileReference fileReference, File directory) {
- File[] files = directory.listFiles();
- if (directory.exists() && directory.isDirectory() && files != null && files.length > 0) {
- if (files.length != 1) {
- throw new RuntimeException("More than one file in '" + fileReference.value() +
- "', expected only one, unable to proceed");
- }
- File file = files[0];
- if (!file.exists()) {
- throw new RuntimeException("File with reference '" + fileReference.value() +
- "' does not exist");
- } else if (!file.canRead()) {
- throw new RuntimeException("File with reference '" + fileReference.value() +
- "'exists, but unable to read it");
- } else {
- fileReferenceDownloader.setDownloadStatus(fileReference.value(), 100.0);
- return Optional.of(file);
- }
- }
- return Optional.empty();
- }
-
- private synchronized Optional<File> queueForDownload(FileReference fileReference, Duration timeout) {
- if (fileReferenceDownloader.isDownloading(fileReference)) {
- log.log(LogLevel.INFO, "Already downloading '" + fileReference.value() + "'");
- ListenableFuture<Optional<File>> future =
- fileReferenceDownloader.addDownloadListener(fileReference, () -> getFile(fileReference));
- try {
- return future.get();
- } catch (InterruptedException | ExecutionException e) {
- throw new RuntimeException("Failed downloading file reference '" + fileReference.value() + "': " +
- Exceptions.toMessageString(e));
- }
- }
-
- SettableFuture<Optional<File>> future = SettableFuture.create();
- queueForDownload(new FileReferenceDownload(fileReference, future));
- log.log(LogLevel.INFO, "Queued '" + fileReference.value() + "' for download with timeout " + timeout);
-
- try {
- Optional<File> fileDownloaded;
- try {
- log.log(LogLevel.INFO, "Waiting for '" + fileReference.value() + "' to download");
- fileDownloaded = future.get(timeout.getSeconds() - 1, TimeUnit.SECONDS);
- log.log(LogLevel.INFO, "'" + fileReference.value() + "' downloaded");
- } catch (TimeoutException e) {
- log.log(LogLevel.WARNING, "Downloading '" + fileReference.value() + "' timed out");
- return Optional.empty();
- }
- return fileDownloaded;
- } catch (InterruptedException | ExecutionException e) {
- throw new RuntimeException("Could not download '" + fileReference.value() + "'");
- }
- }
-
- // We don't care about the future in this call
- private synchronized void queueForDownload(FileReference fileReference) {
- queueForDownload(new FileReferenceDownload(fileReference, SettableFuture.create()));
- }
-
- private synchronized void queueForDownload(FileReferenceDownload fileReferenceDownload) {
- fileReferenceDownloader.addToDownloadQueue(fileReferenceDownload);
- }
-
- Set<FileReference> queuedDownloads() {
- return fileReferenceDownloader.queuedDownloads();
- }
-
-}
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownload.java b/filedistribution/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownload.java
deleted file mode 100644
index ce5a30dc7ad..00000000000
--- a/filedistribution/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownload.java
+++ /dev/null
@@ -1,28 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-package com.yahoo.vespa.config.proxy.filedistribution;
-
-import com.google.common.util.concurrent.SettableFuture;
-import com.yahoo.config.FileReference;
-
-import java.io.File;
-import java.util.Optional;
-
-public class FileReferenceDownload {
- private final FileReference fileReference;
- private final SettableFuture<Optional<File>> future;
-
- FileReferenceDownload(FileReference fileReference, SettableFuture<Optional<File>> future) {
- this.fileReference = fileReference;
- this.future = future;
- }
-
- FileReference fileReference() {
- return fileReference;
- }
-
- SettableFuture<Optional<File>> future() {
- return future;
- }
-
-}
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java
deleted file mode 100644
index 611ad67a5d8..00000000000
--- a/filedistribution/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java
+++ /dev/null
@@ -1,184 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.config.proxy.filedistribution;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.yahoo.concurrent.DaemonThreadFactory;
-import com.yahoo.config.FileReference;
-import com.yahoo.jrt.Request;
-import com.yahoo.jrt.StringValue;
-import com.yahoo.log.LogLevel;
-import com.yahoo.vespa.config.Connection;
-import com.yahoo.vespa.config.ConnectionPool;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.time.Duration;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.logging.Logger;
-import java.util.stream.Collectors;
-
-/**
- * Downloads file reference using rpc requests to config server and keeps track of files being downloaded
- * <p>
- * Some methods are synchronized to make sure access to downloads is atomic
- *
- * @author hmusum
- */
-// TODO: Add retries when a config server does not have a file reference
-// TODO: Handle shutdown of executors
-class FileReferenceDownloader {
-
- private final static Logger log = Logger.getLogger(FileReferenceDownloader.class.getName());
- private final static Duration rpcTimeout = Duration.ofSeconds(10);
-
- private final File downloadDirectory;
- private final ExecutorService downloadExecutor =
- Executors.newFixedThreadPool(10, new DaemonThreadFactory("filereference downloader"));
- private ExecutorService readFromQueueExecutor =
- Executors.newFixedThreadPool(1, new DaemonThreadFactory("filereference download queue"));
- private final ConnectionPool connectionPool;
- private final ConcurrentLinkedQueue<FileReferenceDownload> downloadQueue = new ConcurrentLinkedQueue<>();
- private final Map<FileReference, FileReferenceDownload> downloads = new LinkedHashMap<>();
- private final Map<FileReference, Double> downloadStatus = new HashMap<>();
- private final Duration downloadTimeout;
-
- FileReferenceDownloader(File downloadDirectory, ConnectionPool connectionPool, Duration timeout) {
- this.downloadDirectory = downloadDirectory;
- this.connectionPool = connectionPool;
- this.downloadTimeout = timeout;
- if (connectionPool != null)
- readFromQueueExecutor.submit(this::readFromQueue);
- }
-
- private synchronized Optional<File> startDownload(FileReference fileReference,
- Duration timeout,
- FileReferenceDownload fileReferenceDownload)
- throws ExecutionException, InterruptedException, TimeoutException {
- downloads.put(fileReference, fileReferenceDownload);
- setDownloadStatus(fileReference.value(), 0.0);
- if (startDownloadRpc(fileReference))
- return fileReferenceDownload.future().get(timeout.toMillis(), TimeUnit.MILLISECONDS);
- else {
- fileReferenceDownload.future().setException(new RuntimeException("Failed getting file"));
- downloads.remove(fileReference);
- return Optional.empty();
- }
- }
-
- synchronized void addToDownloadQueue(FileReferenceDownload fileReferenceDownload) {
- downloadQueue.add(fileReferenceDownload);
- }
-
- void receiveFile(FileReference fileReference, String filename, byte[] content) {
- File fileReferenceDir = new File(downloadDirectory, fileReference.value());
- try {
- Files.createDirectories(fileReferenceDir.toPath());
- File file = new File(fileReferenceDir, filename);
- log.log(LogLevel.INFO, "Writing data to " + file.getAbsolutePath());
- Files.write(file.toPath(), content);
- completedDownloading(fileReference, file);
- } catch (IOException e) {
- log.log(LogLevel.ERROR, "Failed writing file: " + e.getMessage());
- throw new RuntimeException("Failed writing file: ", e);
- }
- }
-
- synchronized Set<FileReference> queuedDownloads() {
- return downloadQueue.stream()
- .map(FileReferenceDownload::fileReference)
- .collect(Collectors.toCollection(LinkedHashSet::new));
- }
-
- private void readFromQueue() {
- do {
- FileReferenceDownload fileReferenceDownload = downloadQueue.poll();
- if (fileReferenceDownload == null) {
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) { /* ignore for now */}
- } else {
- log.log(LogLevel.INFO, "Polling queue, found file reference '" +
- fileReferenceDownload.fileReference().value() + "' to download");
- downloadExecutor.submit(() -> startDownload(fileReferenceDownload.fileReference(), downloadTimeout, fileReferenceDownload));
- }
- } while (true);
- }
-
- private synchronized void completedDownloading(FileReference fileReference, File file) {
- downloads.get(fileReference).future().set(Optional.of(file));
- downloadStatus.put(fileReference, 100.0);
- }
-
- private boolean startDownloadRpc(FileReference fileReference) throws ExecutionException, InterruptedException {
- Connection connection = connectionPool.getCurrent();
- Request request = new Request("filedistribution.serveFile");
- request.parameters().add(new StringValue(fileReference.value()));
-
- execute(request, connection);
- if (validateResponse(request)) {
- log.log(LogLevel.DEBUG, "Request callback, OK. Req: " + request + "\nSpec: " + connection);
- if (request.returnValues().get(0).asInt32() == 0)
- log.log(LogLevel.INFO, "Found file reference '" + fileReference.value() + "' available at " + connection.getAddress());
- else
- log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' not found for " + connection.getAddress());
- return true;
- } else {
- log.log(LogLevel.WARNING, "Request failed. Req: " + request + "\nSpec: " + connection.getAddress());
- connection.setError(request.errorCode());
- // TODO: Retry with another config server
- return false;
- }
- }
-
- synchronized boolean isDownloading(FileReference fileReference) {
- return downloads.containsKey(fileReference);
- }
-
- synchronized ListenableFuture<Optional<File>> addDownloadListener(FileReference fileReference, Runnable runnable) {
- FileReferenceDownload fileReferenceDownload = downloads.get(fileReference);
- fileReferenceDownload.future().addListener(runnable, downloadExecutor);
- return fileReferenceDownload.future();
- }
-
- private void execute(Request request, Connection connection) {
- connection.invokeSync(request, (double) rpcTimeout.getSeconds());
- }
-
- private boolean validateResponse(Request request) {
- if (request.isError()) {
- return false;
- } else if (request.returnValues().size() == 0) {
- return false;
- } else if (!request.checkReturnTypes("is")) { // TODO: Do not hard-code return type
- log.log(LogLevel.WARNING, "Invalid return types for response: " + request.errorMessage());
- return false;
- }
- return true;
- }
-
- double downloadStatus(String file) {
- return downloadStatus.getOrDefault(new FileReference(file), 0.0);
- }
-
- void setDownloadStatus(String file, double percentageDownloaded) {
- downloadStatus.put(new FileReference(file), percentageDownloaded);
- }
-
- Map<FileReference, Double> downloadStatus() {
- return ImmutableMap.copyOf(downloadStatus);
- }
-
-}
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/RpcServer.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/RpcServer.java
deleted file mode 100644
index edebbf780f3..00000000000
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/RpcServer.java
+++ /dev/null
@@ -1,155 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-package com.yahoo.vespa.filedistribution;
-
-import com.yahoo.config.FileReference;
-import com.yahoo.jrt.DoubleArray;
-import com.yahoo.jrt.Int32Value;
-import com.yahoo.jrt.Method;
-import com.yahoo.jrt.Request;
-import com.yahoo.jrt.StringArray;
-import com.yahoo.jrt.StringValue;
-import com.yahoo.jrt.Supervisor;
-import com.yahoo.log.LogLevel;
-import com.yahoo.vespa.config.proxy.filedistribution.FileDownloader;
-
-import java.io.File;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.logging.Logger;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-/**
- * An RPC server that handles file distribution requests.
- *
- * @author hmusum
- */
-public class RpcServer {
-
- private final static Logger log = Logger.getLogger(RpcServer.class.getName());
-
- private final Supervisor supervisor;
- private final FileDownloader downloader;
-
- public RpcServer(Supervisor supervisor, FileDownloader downloader) {
- this.supervisor = supervisor;
- this.downloader = downloader;
- declareFileDistributionMethods();
- }
-
- private void declareFileDistributionMethods() {
- // Legacy method, needs to be the same name as used in filedistributor
- supervisor.addMethod(new Method("waitFor", "s", "s",
- this, "getFile")
- .methodDesc("get path to file reference")
- .paramDesc(0, "file reference", "file reference")
- .returnDesc(0, "path", "path to file"));
- supervisor.addMethod(new Method("filedistribution.getFile", "s", "s",
- this, "getFile")
- .methodDesc("get path to file reference")
- .paramDesc(0, "file reference", "file reference")
- .returnDesc(0, "path", "path to file"));
- supervisor.addMethod(new Method("filedistribution.getActiveFileReferencesStatus", "", "SD",
- this, "getActiveFileReferencesStatus")
- .methodDesc("download status for file references")
- .returnDesc(0, "file references", "array of file references")
- .returnDesc(1, "download status", "percentage downloaded of each file reference in above array"));
- supervisor.addMethod(new Method("filedistribution.setFileReferencesToDownload", "S", "i",
- this, "setFileReferencesToDownload")
- .methodDesc("set which file references to download")
- .paramDesc(0, "file references", "file reference to download")
- .returnDesc(0, "ret", "0 if success, 1 otherwise"));
- supervisor.addMethod(new Method("filedistribution.receiveFile", "ssxlis", "i", // TODO Temporary method to get started with testing
- this, "receiveFile")
- .methodDesc("receive file reference content")
- .paramDesc(0, "file references", "file reference to download")
- .paramDesc(1, "filename", "filename")
- .paramDesc(2, "content", "array of bytes")
- .paramDesc(3, "hash", "xx64hash of the file content")
- .paramDesc(4, "errorcode", "Error code. 0 if none")
- .paramDesc(5, "error-description", "Error description.")
- .returnDesc(0, "ret", "0 if success, 1 otherwise"));
- }
-
- //---------------- RPC methods ------------------------------------
- // TODO: Duplicate of code in FileAcquirereImpl. Find out where to put it. What about C++ code using this RPC call?
- private static final int baseErrorCode = 0x10000;
- private static final int baseFileProviderErrorCode = baseErrorCode + 0x1000;
-
- private static final int fileReferenceDoesNotExists = baseFileProviderErrorCode;
- private static final int fileReferenceRemoved = fileReferenceDoesNotExists + 1;
- private static final int fileReferenceInternalError = fileReferenceRemoved + 1;
-
- @SuppressWarnings({"UnusedDeclaration"})
- public final void getFile(Request req) {
- req.detach();
- FileReference fileReference = new FileReference(req.parameters().get(0).asString());
- log.log(LogLevel.DEBUG, "getFile() called for file reference '" + fileReference.value() + "'");
- Optional<File> pathToFile = downloader.getFile(fileReference);
- try {
- if (pathToFile.isPresent()) {
- req.returnValues().add(new StringValue(pathToFile.get().getAbsolutePath()));
- log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' available at " + pathToFile.get());
- } else {
- log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' not found, returning error");
- req.setError(fileReferenceDoesNotExists, "File reference '" + fileReference.value() + "' not found");
- }
- } catch (Throwable e) {
- log.log(LogLevel.WARNING, "File reference '" + fileReference.value() + "' got exeption: " + e.getMessage());
- req.setError(fileReferenceInternalError, "File reference '" + fileReference.value() + "' removed");
- }
- req.returnRequest();
- }
-
- @SuppressWarnings({"UnusedDeclaration"})
- public final void getActiveFileReferencesStatus(Request req) {
- Map<FileReference, Double> downloadStatus = downloader.downloadStatus();
-
- String[] fileRefArray = new String[downloadStatus.keySet().size()];
- fileRefArray = downloadStatus.keySet().stream()
- .map(FileReference::value)
- .collect(Collectors.toList())
- .toArray(fileRefArray);
-
- double[] downloadStatusArray = new double[downloadStatus.values().size()];
- int i = 0;
- for (Double d : downloadStatus.values()) {
- downloadStatusArray[i++] = d;
- }
-
- req.returnValues().add(new StringArray(fileRefArray));
- req.returnValues().add(new DoubleArray(downloadStatusArray));
- }
-
- @SuppressWarnings({"UnusedDeclaration"})
- public final void setFileReferencesToDownload(Request req) {
- String[] fileReferenceStrings = req.parameters().get(0).asStringArray();
- List<FileReference> fileReferences = Stream.of(fileReferenceStrings)
- .map(FileReference::new)
- .collect(Collectors.toList());
- downloader.queueForDownload(fileReferences);
-
- req.returnValues().add(new Int32Value(0));
- }
-
- @SuppressWarnings({"UnusedDeclaration"})
- public final void receiveFile(Request req) {
- FileReference fileReference = new FileReference(req.parameters().get(0).asString());
- String filename = req.parameters().get(1).asString();
- byte[] content = req.parameters().get(2).asData();
- long xxhash = req.parameters().get(3).asInt64();
- int errorCode = req.parameters().get(3).asInt32();
- String errorDescription = req.parameters().get(4).asString();
-
- if (errorCode == 0) {
- //downloader.receive(fileReference, filename, content);
- req.returnValues().add(new Int32Value(0));
- } else {
- log.log(LogLevel.WARNING, "Receiving file reference '" + fileReference.value() + "' failed: " + errorDescription);
- req.returnValues().add(new Int32Value(1));
- // TODO: Add error description return value here too?
- }
- }
-}
diff --git a/filedistribution/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java b/filedistribution/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java
deleted file mode 100644
index 626665da236..00000000000
--- a/filedistribution/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java
+++ /dev/null
@@ -1,237 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.config.proxy.filedistribution;
-
-import com.yahoo.config.FileReference;
-import com.yahoo.io.IOUtils;
-import com.yahoo.jrt.Int32Value;
-import com.yahoo.jrt.Request;
-import com.yahoo.jrt.RequestWaiter;
-import com.yahoo.jrt.StringValue;
-import com.yahoo.jrt.Supervisor;
-import com.yahoo.jrt.Transport;
-import com.yahoo.text.Utf8;
-import com.yahoo.vespa.config.Connection;
-import com.yahoo.vespa.config.ConnectionPool;
-import com.yahoo.vespa.filedistribution.RpcServer;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.time.Duration;
-import java.util.Arrays;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Optional;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-public class FileDownloaderTest {
-
- private MockConnection connection;
- private FileDownloader fileDownloader;
-
- @Before
- public void setup() {
- try {
- File downloadDir = Files.createTempDirectory("filedistribution").toFile();
- connection = new MockConnection();
- fileDownloader = new FileDownloader(connection, downloadDir, Duration.ofMillis(3000));
- RpcServer rpcServer = new RpcServer(new Supervisor(new Transport()), fileDownloader);
- } catch (IOException e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void getFile() throws IOException {
- File downloadDir = fileDownloader.downloadDirectory();
-
- {
- // fileReference already exists on disk, does not have to be downloaded
-
- String fileReferenceString = "foo";
- String filename = "foo.jar";
- File fileReferenceFullPath = fileReferenceFullPath(downloadDir, fileReferenceString);
- FileReference fileReference = new FileReference(fileReferenceString);
- writeFileReference(downloadDir, fileReferenceString, filename);
-
- // Check that we get correct path and content when asking for file reference
- Optional<File> pathToFile = fileDownloader.getFile(fileReference);
- assertTrue(pathToFile.isPresent());
- String downloadedFile = new File(fileReferenceFullPath, filename).getAbsolutePath();
- assertEquals(new File(fileReferenceFullPath, filename).getAbsolutePath(), downloadedFile);
- assertEquals("content", IOUtils.readFile(pathToFile.get()));
-
- // Verify download status when downloaded
- assertDownloadStatus(fileDownloader, fileReference, 100.0);
- }
-
- {
- // fileReference does not exist on disk, needs to be downloaded, but fails when asking upstream for file)
-
- connection.setResponseHandler(new MockConnection.UnknownFileReferenceResponseHandler());
-
- FileReference fileReference = new FileReference("bar");
- File fileReferenceFullPath = fileReferenceFullPath(downloadDir, fileReference.value());
- assertFalse(fileReferenceFullPath.getAbsolutePath(), fileDownloader.getFile(fileReference).isPresent());
-
- // Verify download status when unable to download
- assertDownloadStatus(fileDownloader, fileReference, 0.0);
- }
-
- {
- // fileReference does not exist on disk, needs to be downloaded)
-
- FileReference fileReference = new FileReference("fileReference");
- File fileReferenceFullPath = fileReferenceFullPath(downloadDir, fileReference.value());
- assertFalse(fileReferenceFullPath.getAbsolutePath(), fileDownloader.getFile(fileReference).isPresent());
-
- // Verify download status
- assertDownloadStatus(fileDownloader, fileReference, 0.0);
-
- // Receives fileReference, should return and make it available to caller
- String filename = "abc.jar";
- fileDownloader.receiveFile(fileReference, filename, Utf8.toBytes("some other content"));
- Optional<File> downloadedFile = fileDownloader.getFile(fileReference);
-
- assertTrue(downloadedFile.isPresent());
- File downloadedFileFullPath = new File(fileReferenceFullPath, filename);
- assertEquals(downloadedFileFullPath.getAbsolutePath(), downloadedFile.get().getAbsolutePath());
- assertEquals("some other content", IOUtils.readFile(downloadedFile.get()));
-
- // Verify download status when downloaded
- assertDownloadStatus(fileDownloader, fileReference, 100.0);
- }
- }
-
- @Test
- public void setFilesToDownload() throws IOException {
- File downloadDir = Files.createTempDirectory("filedistribution").toFile();
- FileDownloader fileDownloader = new FileDownloader(null, downloadDir, Duration.ofMillis(200));
- FileReference foo = new FileReference("foo");
- FileReference bar = new FileReference("bar");
- List<FileReference> fileReferences = Arrays.asList(foo, bar);
- fileDownloader.queueForDownload(fileReferences);
-
- // All requested file references should be in queue (since FileDownloader was created without ConnectionPool)
- assertEquals(new LinkedHashSet<>(fileReferences), new LinkedHashSet<>(fileDownloader.queuedDownloads()));
-
- // Verify download status
- assertDownloadStatus(fileDownloader, foo, 0.0);
- assertDownloadStatus(fileDownloader, bar, 0.0);
- }
-
- private void writeFileReference(File dir, String fileReferenceString, String fileName) throws IOException {
- File file = new File(new File(dir, fileReferenceString), fileName);
- IOUtils.writeFile(file, "content", false);
- }
-
- private File fileReferenceFullPath(File dir, String fileReferenceString) {
- return new File(dir, fileReferenceString);
- }
-
- private void assertDownloadStatus(FileDownloader fileDownloader, FileReference fileReference, double expectedDownloadStatus) {
- double downloadStatus = fileDownloader.downloadStatus(fileReference);
- assertEquals(expectedDownloadStatus, downloadStatus, 0.0001);
- }
-
- private static class MockConnection implements ConnectionPool, com.yahoo.vespa.config.Connection {
-
- private ResponseHandler responseHandler;
-
- MockConnection() {
- this(new FileReferenceFoundResponseHandler());
- }
-
- MockConnection(ResponseHandler responseHandler) {
- this.responseHandler = responseHandler;
- }
-
- @Override
- public void invokeAsync(Request request, double jrtTimeout, RequestWaiter requestWaiter) {
- responseHandler.request(request);
- }
-
- @Override
- public void invokeSync(Request request, double jrtTimeout) {
- responseHandler.request(request);
- }
-
- @Override
- public void setError(int errorCode) {
- }
-
- @Override
- public void setSuccess() {
- }
-
- @Override
- public String getAddress() {
- return null;
- }
-
- @Override
- public void close() {
- }
-
- @Override
- public void setError(Connection connection, int errorCode) {
- connection.setError(errorCode);
- }
-
- @Override
- public Connection getCurrent() {
- return this;
- }
-
- @Override
- public Connection setNewCurrentConnection() {
- return this;
- }
-
- @Override
- public int getSize() {
- return 1;
- }
-
- public void setResponseHandler(ResponseHandler responseHandler) {
- this.responseHandler = responseHandler;
- }
-
- static class FileReferenceFoundResponseHandler implements ResponseHandler {
-
- @Override
- public void request(Request request) {
- if (request.methodName().equals("filedistribution.serveFile")) {
- request.returnValues().add(new Int32Value(0));
- request.returnValues().add(new StringValue("OK"));
- }
- }
- }
-
- static class UnknownFileReferenceResponseHandler implements ResponseHandler {
-
- @Override
- public void request(Request request) {
- if (request.methodName().equals("filedistribution.serveFile")) {
- request.returnValues().add(new Int32Value(1));
- request.returnValues().add(new StringValue("Internal error"));
- }
- }
- }
-
- public interface ResponseHandler {
-
- void request(Request request);
-
- }
-
- }
-
-}