From 323df5546118baeae7a38de5d9319e0107229440 Mon Sep 17 00:00:00 2001 From: Harald Musum Date: Thu, 9 Nov 2017 10:53:13 +0100 Subject: Move file distribution stuff to filedistribution module --- filedistribution/pom.xml | 59 +++++ .../proxy/filedistribution/FileDownloader.java | 153 +++++++++++++ .../filedistribution/FileReferenceDownload.java | 28 +++ .../filedistribution/FileReferenceDownloader.java | 184 ++++++++++++++++ .../yahoo/vespa/filedistribution/RpcServer.java | 155 ++++++++++++++ .../proxy/filedistribution/FileDownloaderTest.java | 237 +++++++++++++++++++++ 6 files changed, 816 insertions(+) create mode 100644 filedistribution/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloader.java create mode 100644 filedistribution/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownload.java create mode 100644 filedistribution/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java create mode 100644 filedistribution/src/main/java/com/yahoo/vespa/filedistribution/RpcServer.java create mode 100644 filedistribution/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java (limited to 'filedistribution') diff --git a/filedistribution/pom.xml b/filedistribution/pom.xml index 0a4b75788bc..2a63c1ba962 100644 --- a/filedistribution/pom.xml +++ b/filedistribution/pom.xml @@ -14,11 +14,55 @@ jar ${project.artifactId} + + junit + junit + test + + + com.yahoo.vespa + yolean + ${project.version} + com.yahoo.vespa config-lib ${project.version} + + com.yahoo.vespa + defaults + ${project.version} + + + com.yahoo.vespa + config + ${project.version} + + + com.yahoo.vespa + jrt + ${project.version} + + + com.yahoo.vespa + vespajlib + ${project.version} + + + com.yahoo.vespa + vespalog + ${project.version} + + + org.hamcrest + hamcrest-core + test + + + com.google.guava + guava + @@ -41,6 +85,21 @@ + + com.yahoo.vespa + bundle-plugin + true + + + org.apache.maven.plugins + maven-compiler-plugin + + + -Xlint:all + -Werror + + + diff --git a/filedistribution/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloader.java new file mode 100644 index 00000000000..f3c694f31ab --- /dev/null +++ b/filedistribution/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloader.java @@ -0,0 +1,153 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.config.proxy.filedistribution; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import com.yahoo.config.FileReference; +import com.yahoo.log.LogLevel; +import com.yahoo.vespa.config.ConnectionPool; +import com.yahoo.vespa.defaults.Defaults; +import com.yahoo.yolean.Exceptions; + +import java.io.File; +import java.time.Duration; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.logging.Logger; + +/** + * Handles downloads of files (file references only for now) + * + * @author hmusum + */ +public class FileDownloader { + + private final static Logger log = Logger.getLogger(FileDownloader.class.getName()); + + private final File downloadDirectory; + private final Duration timeout; + private final FileReferenceDownloader fileReferenceDownloader; + + public FileDownloader(ConnectionPool connectionPool) { + this(connectionPool, + new File(Defaults.getDefaults().underVespaHome("var/db/vespa/filedistribution")), + Duration.ofMinutes(15)); + } + + FileDownloader(ConnectionPool connectionPool, File downloadDirectory, Duration timeout) { + this.downloadDirectory = downloadDirectory; + this.timeout = timeout; + this.fileReferenceDownloader = new FileReferenceDownloader(downloadDirectory, connectionPool, timeout); + } + + public Optional getFile(FileReference fileReference) { + Objects.requireNonNull(fileReference, "file reference cannot be null"); + File directory = new File(downloadDirectory, fileReference.value()); + log.log(LogLevel.DEBUG, "Checking if there is a file in '" + directory.getAbsolutePath() + "' "); + + Optional file = getFileFromFileSystem(fileReference, directory); + if (file.isPresent()) { + return file; + } else { + log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' not found in " + + directory.getAbsolutePath() + ", starting download"); + return queueForDownload(fileReference, timeout); + } + } + + public void queueForDownload(List fileReferences) { + fileReferences.forEach(this::queueForDownload); + } + + public void receiveFile(FileReference fileReference, String filename, byte[] content) { + fileReferenceDownloader.receiveFile(fileReference, filename, content); + } + + double downloadStatus(FileReference fileReference) { + return fileReferenceDownloader.downloadStatus(fileReference.value()); + } + + public Map downloadStatus() { + return fileReferenceDownloader.downloadStatus(); + } + + File downloadDirectory() { + return downloadDirectory; + } + + private Optional getFileFromFileSystem(FileReference fileReference, File directory) { + File[] files = directory.listFiles(); + if (directory.exists() && directory.isDirectory() && files != null && files.length > 0) { + if (files.length != 1) { + throw new RuntimeException("More than one file in '" + fileReference.value() + + "', expected only one, unable to proceed"); + } + File file = files[0]; + if (!file.exists()) { + throw new RuntimeException("File with reference '" + fileReference.value() + + "' does not exist"); + } else if (!file.canRead()) { + throw new RuntimeException("File with reference '" + fileReference.value() + + "'exists, but unable to read it"); + } else { + fileReferenceDownloader.setDownloadStatus(fileReference.value(), 100.0); + return Optional.of(file); + } + } + return Optional.empty(); + } + + private synchronized Optional queueForDownload(FileReference fileReference, Duration timeout) { + if (fileReferenceDownloader.isDownloading(fileReference)) { + log.log(LogLevel.INFO, "Already downloading '" + fileReference.value() + "'"); + ListenableFuture> future = + fileReferenceDownloader.addDownloadListener(fileReference, () -> getFile(fileReference)); + try { + return future.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException("Failed downloading file reference '" + fileReference.value() + "': " + + Exceptions.toMessageString(e)); + } + } + + SettableFuture> future = SettableFuture.create(); + queueForDownload(new FileReferenceDownload(fileReference, future)); + log.log(LogLevel.INFO, "Queued '" + fileReference.value() + "' for download with timeout " + timeout); + + try { + Optional fileDownloaded; + try { + log.log(LogLevel.INFO, "Waiting for '" + fileReference.value() + "' to download"); + fileDownloaded = future.get(timeout.getSeconds() - 1, TimeUnit.SECONDS); + log.log(LogLevel.INFO, "'" + fileReference.value() + "' downloaded"); + } catch (TimeoutException e) { + log.log(LogLevel.WARNING, "Downloading '" + fileReference.value() + "' timed out"); + return Optional.empty(); + } + return fileDownloaded; + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException("Could not download '" + fileReference.value() + "'"); + } + } + + // We don't care about the future in this call + private synchronized void queueForDownload(FileReference fileReference) { + queueForDownload(new FileReferenceDownload(fileReference, SettableFuture.create())); + } + + private synchronized void queueForDownload(FileReferenceDownload fileReferenceDownload) { + fileReferenceDownloader.addToDownloadQueue(fileReferenceDownload); + } + + Set queuedDownloads() { + return fileReferenceDownloader.queuedDownloads(); + } + +} diff --git a/filedistribution/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownload.java b/filedistribution/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownload.java new file mode 100644 index 00000000000..ce5a30dc7ad --- /dev/null +++ b/filedistribution/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownload.java @@ -0,0 +1,28 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +package com.yahoo.vespa.config.proxy.filedistribution; + +import com.google.common.util.concurrent.SettableFuture; +import com.yahoo.config.FileReference; + +import java.io.File; +import java.util.Optional; + +public class FileReferenceDownload { + private final FileReference fileReference; + private final SettableFuture> future; + + FileReferenceDownload(FileReference fileReference, SettableFuture> future) { + this.fileReference = fileReference; + this.future = future; + } + + FileReference fileReference() { + return fileReference; + } + + SettableFuture> future() { + return future; + } + +} diff --git a/filedistribution/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java new file mode 100644 index 00000000000..611ad67a5d8 --- /dev/null +++ b/filedistribution/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java @@ -0,0 +1,184 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.config.proxy.filedistribution; + +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.ListenableFuture; +import com.yahoo.concurrent.DaemonThreadFactory; +import com.yahoo.config.FileReference; +import com.yahoo.jrt.Request; +import com.yahoo.jrt.StringValue; +import com.yahoo.log.LogLevel; +import com.yahoo.vespa.config.Connection; +import com.yahoo.vespa.config.ConnectionPool; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.time.Duration; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.logging.Logger; +import java.util.stream.Collectors; + +/** + * Downloads file reference using rpc requests to config server and keeps track of files being downloaded + *

+ * Some methods are synchronized to make sure access to downloads is atomic + * + * @author hmusum + */ +// TODO: Add retries when a config server does not have a file reference +// TODO: Handle shutdown of executors +class FileReferenceDownloader { + + private final static Logger log = Logger.getLogger(FileReferenceDownloader.class.getName()); + private final static Duration rpcTimeout = Duration.ofSeconds(10); + + private final File downloadDirectory; + private final ExecutorService downloadExecutor = + Executors.newFixedThreadPool(10, new DaemonThreadFactory("filereference downloader")); + private ExecutorService readFromQueueExecutor = + Executors.newFixedThreadPool(1, new DaemonThreadFactory("filereference download queue")); + private final ConnectionPool connectionPool; + private final ConcurrentLinkedQueue downloadQueue = new ConcurrentLinkedQueue<>(); + private final Map downloads = new LinkedHashMap<>(); + private final Map downloadStatus = new HashMap<>(); + private final Duration downloadTimeout; + + FileReferenceDownloader(File downloadDirectory, ConnectionPool connectionPool, Duration timeout) { + this.downloadDirectory = downloadDirectory; + this.connectionPool = connectionPool; + this.downloadTimeout = timeout; + if (connectionPool != null) + readFromQueueExecutor.submit(this::readFromQueue); + } + + private synchronized Optional startDownload(FileReference fileReference, + Duration timeout, + FileReferenceDownload fileReferenceDownload) + throws ExecutionException, InterruptedException, TimeoutException { + downloads.put(fileReference, fileReferenceDownload); + setDownloadStatus(fileReference.value(), 0.0); + if (startDownloadRpc(fileReference)) + return fileReferenceDownload.future().get(timeout.toMillis(), TimeUnit.MILLISECONDS); + else { + fileReferenceDownload.future().setException(new RuntimeException("Failed getting file")); + downloads.remove(fileReference); + return Optional.empty(); + } + } + + synchronized void addToDownloadQueue(FileReferenceDownload fileReferenceDownload) { + downloadQueue.add(fileReferenceDownload); + } + + void receiveFile(FileReference fileReference, String filename, byte[] content) { + File fileReferenceDir = new File(downloadDirectory, fileReference.value()); + try { + Files.createDirectories(fileReferenceDir.toPath()); + File file = new File(fileReferenceDir, filename); + log.log(LogLevel.INFO, "Writing data to " + file.getAbsolutePath()); + Files.write(file.toPath(), content); + completedDownloading(fileReference, file); + } catch (IOException e) { + log.log(LogLevel.ERROR, "Failed writing file: " + e.getMessage()); + throw new RuntimeException("Failed writing file: ", e); + } + } + + synchronized Set queuedDownloads() { + return downloadQueue.stream() + .map(FileReferenceDownload::fileReference) + .collect(Collectors.toCollection(LinkedHashSet::new)); + } + + private void readFromQueue() { + do { + FileReferenceDownload fileReferenceDownload = downloadQueue.poll(); + if (fileReferenceDownload == null) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { /* ignore for now */} + } else { + log.log(LogLevel.INFO, "Polling queue, found file reference '" + + fileReferenceDownload.fileReference().value() + "' to download"); + downloadExecutor.submit(() -> startDownload(fileReferenceDownload.fileReference(), downloadTimeout, fileReferenceDownload)); + } + } while (true); + } + + private synchronized void completedDownloading(FileReference fileReference, File file) { + downloads.get(fileReference).future().set(Optional.of(file)); + downloadStatus.put(fileReference, 100.0); + } + + private boolean startDownloadRpc(FileReference fileReference) throws ExecutionException, InterruptedException { + Connection connection = connectionPool.getCurrent(); + Request request = new Request("filedistribution.serveFile"); + request.parameters().add(new StringValue(fileReference.value())); + + execute(request, connection); + if (validateResponse(request)) { + log.log(LogLevel.DEBUG, "Request callback, OK. Req: " + request + "\nSpec: " + connection); + if (request.returnValues().get(0).asInt32() == 0) + log.log(LogLevel.INFO, "Found file reference '" + fileReference.value() + "' available at " + connection.getAddress()); + else + log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' not found for " + connection.getAddress()); + return true; + } else { + log.log(LogLevel.WARNING, "Request failed. Req: " + request + "\nSpec: " + connection.getAddress()); + connection.setError(request.errorCode()); + // TODO: Retry with another config server + return false; + } + } + + synchronized boolean isDownloading(FileReference fileReference) { + return downloads.containsKey(fileReference); + } + + synchronized ListenableFuture> addDownloadListener(FileReference fileReference, Runnable runnable) { + FileReferenceDownload fileReferenceDownload = downloads.get(fileReference); + fileReferenceDownload.future().addListener(runnable, downloadExecutor); + return fileReferenceDownload.future(); + } + + private void execute(Request request, Connection connection) { + connection.invokeSync(request, (double) rpcTimeout.getSeconds()); + } + + private boolean validateResponse(Request request) { + if (request.isError()) { + return false; + } else if (request.returnValues().size() == 0) { + return false; + } else if (!request.checkReturnTypes("is")) { // TODO: Do not hard-code return type + log.log(LogLevel.WARNING, "Invalid return types for response: " + request.errorMessage()); + return false; + } + return true; + } + + double downloadStatus(String file) { + return downloadStatus.getOrDefault(new FileReference(file), 0.0); + } + + void setDownloadStatus(String file, double percentageDownloaded) { + downloadStatus.put(new FileReference(file), percentageDownloaded); + } + + Map downloadStatus() { + return ImmutableMap.copyOf(downloadStatus); + } + +} diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/RpcServer.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/RpcServer.java new file mode 100644 index 00000000000..edebbf780f3 --- /dev/null +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/RpcServer.java @@ -0,0 +1,155 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +package com.yahoo.vespa.filedistribution; + +import com.yahoo.config.FileReference; +import com.yahoo.jrt.DoubleArray; +import com.yahoo.jrt.Int32Value; +import com.yahoo.jrt.Method; +import com.yahoo.jrt.Request; +import com.yahoo.jrt.StringArray; +import com.yahoo.jrt.StringValue; +import com.yahoo.jrt.Supervisor; +import com.yahoo.log.LogLevel; +import com.yahoo.vespa.config.proxy.filedistribution.FileDownloader; + +import java.io.File; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.logging.Logger; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * An RPC server that handles file distribution requests. + * + * @author hmusum + */ +public class RpcServer { + + private final static Logger log = Logger.getLogger(RpcServer.class.getName()); + + private final Supervisor supervisor; + private final FileDownloader downloader; + + public RpcServer(Supervisor supervisor, FileDownloader downloader) { + this.supervisor = supervisor; + this.downloader = downloader; + declareFileDistributionMethods(); + } + + private void declareFileDistributionMethods() { + // Legacy method, needs to be the same name as used in filedistributor + supervisor.addMethod(new Method("waitFor", "s", "s", + this, "getFile") + .methodDesc("get path to file reference") + .paramDesc(0, "file reference", "file reference") + .returnDesc(0, "path", "path to file")); + supervisor.addMethod(new Method("filedistribution.getFile", "s", "s", + this, "getFile") + .methodDesc("get path to file reference") + .paramDesc(0, "file reference", "file reference") + .returnDesc(0, "path", "path to file")); + supervisor.addMethod(new Method("filedistribution.getActiveFileReferencesStatus", "", "SD", + this, "getActiveFileReferencesStatus") + .methodDesc("download status for file references") + .returnDesc(0, "file references", "array of file references") + .returnDesc(1, "download status", "percentage downloaded of each file reference in above array")); + supervisor.addMethod(new Method("filedistribution.setFileReferencesToDownload", "S", "i", + this, "setFileReferencesToDownload") + .methodDesc("set which file references to download") + .paramDesc(0, "file references", "file reference to download") + .returnDesc(0, "ret", "0 if success, 1 otherwise")); + supervisor.addMethod(new Method("filedistribution.receiveFile", "ssxlis", "i", // TODO Temporary method to get started with testing + this, "receiveFile") + .methodDesc("receive file reference content") + .paramDesc(0, "file references", "file reference to download") + .paramDesc(1, "filename", "filename") + .paramDesc(2, "content", "array of bytes") + .paramDesc(3, "hash", "xx64hash of the file content") + .paramDesc(4, "errorcode", "Error code. 0 if none") + .paramDesc(5, "error-description", "Error description.") + .returnDesc(0, "ret", "0 if success, 1 otherwise")); + } + + //---------------- RPC methods ------------------------------------ + // TODO: Duplicate of code in FileAcquirereImpl. Find out where to put it. What about C++ code using this RPC call? + private static final int baseErrorCode = 0x10000; + private static final int baseFileProviderErrorCode = baseErrorCode + 0x1000; + + private static final int fileReferenceDoesNotExists = baseFileProviderErrorCode; + private static final int fileReferenceRemoved = fileReferenceDoesNotExists + 1; + private static final int fileReferenceInternalError = fileReferenceRemoved + 1; + + @SuppressWarnings({"UnusedDeclaration"}) + public final void getFile(Request req) { + req.detach(); + FileReference fileReference = new FileReference(req.parameters().get(0).asString()); + log.log(LogLevel.DEBUG, "getFile() called for file reference '" + fileReference.value() + "'"); + Optional pathToFile = downloader.getFile(fileReference); + try { + if (pathToFile.isPresent()) { + req.returnValues().add(new StringValue(pathToFile.get().getAbsolutePath())); + log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' available at " + pathToFile.get()); + } else { + log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' not found, returning error"); + req.setError(fileReferenceDoesNotExists, "File reference '" + fileReference.value() + "' not found"); + } + } catch (Throwable e) { + log.log(LogLevel.WARNING, "File reference '" + fileReference.value() + "' got exeption: " + e.getMessage()); + req.setError(fileReferenceInternalError, "File reference '" + fileReference.value() + "' removed"); + } + req.returnRequest(); + } + + @SuppressWarnings({"UnusedDeclaration"}) + public final void getActiveFileReferencesStatus(Request req) { + Map downloadStatus = downloader.downloadStatus(); + + String[] fileRefArray = new String[downloadStatus.keySet().size()]; + fileRefArray = downloadStatus.keySet().stream() + .map(FileReference::value) + .collect(Collectors.toList()) + .toArray(fileRefArray); + + double[] downloadStatusArray = new double[downloadStatus.values().size()]; + int i = 0; + for (Double d : downloadStatus.values()) { + downloadStatusArray[i++] = d; + } + + req.returnValues().add(new StringArray(fileRefArray)); + req.returnValues().add(new DoubleArray(downloadStatusArray)); + } + + @SuppressWarnings({"UnusedDeclaration"}) + public final void setFileReferencesToDownload(Request req) { + String[] fileReferenceStrings = req.parameters().get(0).asStringArray(); + List fileReferences = Stream.of(fileReferenceStrings) + .map(FileReference::new) + .collect(Collectors.toList()); + downloader.queueForDownload(fileReferences); + + req.returnValues().add(new Int32Value(0)); + } + + @SuppressWarnings({"UnusedDeclaration"}) + public final void receiveFile(Request req) { + FileReference fileReference = new FileReference(req.parameters().get(0).asString()); + String filename = req.parameters().get(1).asString(); + byte[] content = req.parameters().get(2).asData(); + long xxhash = req.parameters().get(3).asInt64(); + int errorCode = req.parameters().get(3).asInt32(); + String errorDescription = req.parameters().get(4).asString(); + + if (errorCode == 0) { + //downloader.receive(fileReference, filename, content); + req.returnValues().add(new Int32Value(0)); + } else { + log.log(LogLevel.WARNING, "Receiving file reference '" + fileReference.value() + "' failed: " + errorDescription); + req.returnValues().add(new Int32Value(1)); + // TODO: Add error description return value here too? + } + } +} diff --git a/filedistribution/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java b/filedistribution/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java new file mode 100644 index 00000000000..626665da236 --- /dev/null +++ b/filedistribution/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java @@ -0,0 +1,237 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.config.proxy.filedistribution; + +import com.yahoo.config.FileReference; +import com.yahoo.io.IOUtils; +import com.yahoo.jrt.Int32Value; +import com.yahoo.jrt.Request; +import com.yahoo.jrt.RequestWaiter; +import com.yahoo.jrt.StringValue; +import com.yahoo.jrt.Supervisor; +import com.yahoo.jrt.Transport; +import com.yahoo.text.Utf8; +import com.yahoo.vespa.config.Connection; +import com.yahoo.vespa.config.ConnectionPool; +import com.yahoo.vespa.filedistribution.RpcServer; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.time.Duration; +import java.util.Arrays; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Optional; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class FileDownloaderTest { + + private MockConnection connection; + private FileDownloader fileDownloader; + + @Before + public void setup() { + try { + File downloadDir = Files.createTempDirectory("filedistribution").toFile(); + connection = new MockConnection(); + fileDownloader = new FileDownloader(connection, downloadDir, Duration.ofMillis(3000)); + RpcServer rpcServer = new RpcServer(new Supervisor(new Transport()), fileDownloader); + } catch (IOException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void getFile() throws IOException { + File downloadDir = fileDownloader.downloadDirectory(); + + { + // fileReference already exists on disk, does not have to be downloaded + + String fileReferenceString = "foo"; + String filename = "foo.jar"; + File fileReferenceFullPath = fileReferenceFullPath(downloadDir, fileReferenceString); + FileReference fileReference = new FileReference(fileReferenceString); + writeFileReference(downloadDir, fileReferenceString, filename); + + // Check that we get correct path and content when asking for file reference + Optional pathToFile = fileDownloader.getFile(fileReference); + assertTrue(pathToFile.isPresent()); + String downloadedFile = new File(fileReferenceFullPath, filename).getAbsolutePath(); + assertEquals(new File(fileReferenceFullPath, filename).getAbsolutePath(), downloadedFile); + assertEquals("content", IOUtils.readFile(pathToFile.get())); + + // Verify download status when downloaded + assertDownloadStatus(fileDownloader, fileReference, 100.0); + } + + { + // fileReference does not exist on disk, needs to be downloaded, but fails when asking upstream for file) + + connection.setResponseHandler(new MockConnection.UnknownFileReferenceResponseHandler()); + + FileReference fileReference = new FileReference("bar"); + File fileReferenceFullPath = fileReferenceFullPath(downloadDir, fileReference.value()); + assertFalse(fileReferenceFullPath.getAbsolutePath(), fileDownloader.getFile(fileReference).isPresent()); + + // Verify download status when unable to download + assertDownloadStatus(fileDownloader, fileReference, 0.0); + } + + { + // fileReference does not exist on disk, needs to be downloaded) + + FileReference fileReference = new FileReference("fileReference"); + File fileReferenceFullPath = fileReferenceFullPath(downloadDir, fileReference.value()); + assertFalse(fileReferenceFullPath.getAbsolutePath(), fileDownloader.getFile(fileReference).isPresent()); + + // Verify download status + assertDownloadStatus(fileDownloader, fileReference, 0.0); + + // Receives fileReference, should return and make it available to caller + String filename = "abc.jar"; + fileDownloader.receiveFile(fileReference, filename, Utf8.toBytes("some other content")); + Optional downloadedFile = fileDownloader.getFile(fileReference); + + assertTrue(downloadedFile.isPresent()); + File downloadedFileFullPath = new File(fileReferenceFullPath, filename); + assertEquals(downloadedFileFullPath.getAbsolutePath(), downloadedFile.get().getAbsolutePath()); + assertEquals("some other content", IOUtils.readFile(downloadedFile.get())); + + // Verify download status when downloaded + assertDownloadStatus(fileDownloader, fileReference, 100.0); + } + } + + @Test + public void setFilesToDownload() throws IOException { + File downloadDir = Files.createTempDirectory("filedistribution").toFile(); + FileDownloader fileDownloader = new FileDownloader(null, downloadDir, Duration.ofMillis(200)); + FileReference foo = new FileReference("foo"); + FileReference bar = new FileReference("bar"); + List fileReferences = Arrays.asList(foo, bar); + fileDownloader.queueForDownload(fileReferences); + + // All requested file references should be in queue (since FileDownloader was created without ConnectionPool) + assertEquals(new LinkedHashSet<>(fileReferences), new LinkedHashSet<>(fileDownloader.queuedDownloads())); + + // Verify download status + assertDownloadStatus(fileDownloader, foo, 0.0); + assertDownloadStatus(fileDownloader, bar, 0.0); + } + + private void writeFileReference(File dir, String fileReferenceString, String fileName) throws IOException { + File file = new File(new File(dir, fileReferenceString), fileName); + IOUtils.writeFile(file, "content", false); + } + + private File fileReferenceFullPath(File dir, String fileReferenceString) { + return new File(dir, fileReferenceString); + } + + private void assertDownloadStatus(FileDownloader fileDownloader, FileReference fileReference, double expectedDownloadStatus) { + double downloadStatus = fileDownloader.downloadStatus(fileReference); + assertEquals(expectedDownloadStatus, downloadStatus, 0.0001); + } + + private static class MockConnection implements ConnectionPool, com.yahoo.vespa.config.Connection { + + private ResponseHandler responseHandler; + + MockConnection() { + this(new FileReferenceFoundResponseHandler()); + } + + MockConnection(ResponseHandler responseHandler) { + this.responseHandler = responseHandler; + } + + @Override + public void invokeAsync(Request request, double jrtTimeout, RequestWaiter requestWaiter) { + responseHandler.request(request); + } + + @Override + public void invokeSync(Request request, double jrtTimeout) { + responseHandler.request(request); + } + + @Override + public void setError(int errorCode) { + } + + @Override + public void setSuccess() { + } + + @Override + public String getAddress() { + return null; + } + + @Override + public void close() { + } + + @Override + public void setError(Connection connection, int errorCode) { + connection.setError(errorCode); + } + + @Override + public Connection getCurrent() { + return this; + } + + @Override + public Connection setNewCurrentConnection() { + return this; + } + + @Override + public int getSize() { + return 1; + } + + public void setResponseHandler(ResponseHandler responseHandler) { + this.responseHandler = responseHandler; + } + + static class FileReferenceFoundResponseHandler implements ResponseHandler { + + @Override + public void request(Request request) { + if (request.methodName().equals("filedistribution.serveFile")) { + request.returnValues().add(new Int32Value(0)); + request.returnValues().add(new StringValue("OK")); + } + } + } + + static class UnknownFileReferenceResponseHandler implements ResponseHandler { + + @Override + public void request(Request request) { + if (request.methodName().equals("filedistribution.serveFile")) { + request.returnValues().add(new Int32Value(1)); + request.returnValues().add(new StringValue("Internal error")); + } + } + } + + public interface ResponseHandler { + + void request(Request request); + + } + + } + +} -- cgit v1.2.3