From db62c71c4488db570e0a7e32fe8ab77545fe45d4 Mon Sep 17 00:00:00 2001 From: Harald Musum Date: Fri, 17 Nov 2017 12:11:02 +0100 Subject: Move fildistribution code to filedistribution module --- config-proxy/pom.xml | 9 +- .../com/yahoo/vespa/config/proxy/ProxyServer.java | 4 +- .../FileDistributionRpcServer.java | 127 ---------- .../proxy/filedistribution/FileDownloader.java | 153 ------------ .../proxy/filedistribution/FileReceiver.java | 133 ---------- .../filedistribution/FileReferenceDownload.java | 28 --- .../filedistribution/FileReferenceDownloader.java | 172 ------------- .../proxy/filedistribution/FileDownloaderTest.java | 277 --------------------- 8 files changed, 7 insertions(+), 896 deletions(-) delete mode 100644 config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionRpcServer.java delete mode 100644 config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloader.java delete mode 100644 config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReceiver.java delete mode 100644 config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownload.java delete mode 100644 config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java delete mode 100644 config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java (limited to 'config-proxy') diff --git a/config-proxy/pom.xml b/config-proxy/pom.xml index 8a7b6f253e2..0aadb1bbb12 100644 --- a/config-proxy/pom.xml +++ b/config-proxy/pom.xml @@ -57,10 +57,11 @@ hamcrest-core test - - com.google.guava - guava - + + com.yahoo.vespa + filedistribution + ${project.version} + diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java index 173d2b8a43a..28bcca9db13 100644 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java @@ -13,8 +13,8 @@ import com.yahoo.log.event.Event; import com.yahoo.system.CatchSigTerm; import com.yahoo.vespa.config.*; import com.yahoo.vespa.config.protocol.JRTServerConfigRequest; -import com.yahoo.vespa.config.proxy.filedistribution.FileDistributionRpcServer; -import com.yahoo.vespa.config.proxy.filedistribution.FileDownloader; +import com.yahoo.vespa.filedistribution.FileDistributionRpcServer; +import com.yahoo.vespa.filedistribution.FileDownloader; import java.util.List; import java.util.concurrent.Executors; diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionRpcServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionRpcServer.java deleted file mode 100644 index b0fbc7acd33..00000000000 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionRpcServer.java +++ /dev/null @@ -1,127 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.config.proxy.filedistribution; - -import com.yahoo.config.FileReference; -import com.yahoo.jrt.DoubleArray; -import com.yahoo.jrt.Int32Value; -import com.yahoo.jrt.Method; -import com.yahoo.jrt.Request; -import com.yahoo.jrt.StringArray; -import com.yahoo.jrt.StringValue; -import com.yahoo.jrt.Supervisor; -import com.yahoo.log.LogLevel; - -import java.io.File; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.logging.Logger; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -/** - * An RPC server that handles file distribution requests. - * - * @author hmusum - */ -public class FileDistributionRpcServer { - - private final static Logger log = Logger.getLogger(FileDistributionRpcServer.class.getName()); - - private final Supervisor supervisor; - private final FileDownloader downloader; - - public FileDistributionRpcServer(Supervisor supervisor, FileDownloader downloader) { - this.supervisor = supervisor; - this.downloader = downloader; - declareFileDistributionMethods(); - } - - private void declareFileDistributionMethods() { - // Legacy method, needs to be the same name as used in filedistributor - supervisor.addMethod(new Method("waitFor", "s", "s", - this, "getFile") - .methodDesc("get path to file reference") - .paramDesc(0, "file reference", "file reference") - .returnDesc(0, "path", "path to file")); - supervisor.addMethod(new Method("filedistribution.getFile", "s", "s", - this, "getFile") - .methodDesc("get path to file reference") - .paramDesc(0, "file reference", "file reference") - .returnDesc(0, "path", "path to file")); - supervisor.addMethod(new Method("filedistribution.getActiveFileReferencesStatus", "", "SD", - this, "getActiveFileReferencesStatus") - .methodDesc("download status for file references") - .returnDesc(0, "file references", "array of file references") - .returnDesc(1, "download status", "percentage downloaded of each file reference in above array")); - supervisor.addMethod(new Method("filedistribution.setFileReferencesToDownload", "S", "i", - this, "setFileReferencesToDownload") - .methodDesc("set which file references to download") - .paramDesc(0, "file references", "file reference to download") - .returnDesc(0, "ret", "0 if success, 1 otherwise")); - } - - - //---------------- RPC methods ------------------------------------ - // TODO: Duplicate of code in FileAcquirereImpl. Find out where to put it. What about C++ code using this RPC call? - private static final int baseErrorCode = 0x10000; - private static final int baseFileProviderErrorCode = baseErrorCode + 0x1000; - - private static final int fileReferenceDoesNotExists = baseFileProviderErrorCode; - private static final int fileReferenceRemoved = fileReferenceDoesNotExists + 1; - private static final int fileReferenceInternalError = fileReferenceRemoved + 1; - - @SuppressWarnings({"UnusedDeclaration"}) - public final void getFile(Request req) { - req.detach(); - FileReference fileReference = new FileReference(req.parameters().get(0).asString()); - log.log(LogLevel.DEBUG, "getFile() called for file reference '" + fileReference.value() + "'"); - Optional pathToFile = downloader.getFile(fileReference); - try { - if (pathToFile.isPresent()) { - req.returnValues().add(new StringValue(pathToFile.get().getAbsolutePath())); - log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' available at " + pathToFile.get()); - } else { - log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' not found, returning error"); - req.setError(fileReferenceDoesNotExists, "File reference '" + fileReference.value() + "' not found"); - } - } catch (Throwable e) { - log.log(LogLevel.WARNING, "File reference '" + fileReference.value() + "' got exeption: " + e.getMessage()); - req.setError(fileReferenceInternalError, "File reference '" + fileReference.value() + "' removed"); - } - req.returnRequest(); - } - - @SuppressWarnings({"UnusedDeclaration"}) - public final void getActiveFileReferencesStatus(Request req) { - Map downloadStatus = downloader.downloadStatus(); - - String[] fileRefArray = new String[downloadStatus.keySet().size()]; - fileRefArray = downloadStatus.keySet().stream() - .map(FileReference::value) - .collect(Collectors.toList()) - .toArray(fileRefArray); - - double[] downloadStatusArray = new double[downloadStatus.values().size()]; - int i = 0; - for (Double d : downloadStatus.values()) { - downloadStatusArray[i++] = d; - } - - req.returnValues().add(new StringArray(fileRefArray)); - req.returnValues().add(new DoubleArray(downloadStatusArray)); - } - - @SuppressWarnings({"UnusedDeclaration"}) - public final void setFileReferencesToDownload(Request req) { - String[] fileReferenceStrings = req.parameters().get(0).asStringArray(); - List fileReferences = Stream.of(fileReferenceStrings) - .map(FileReference::new) - .collect(Collectors.toList()); - downloader.queueForDownload(fileReferences); - - req.returnValues().add(new Int32Value(0)); - } - - -} diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloader.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloader.java deleted file mode 100644 index ac7555c7905..00000000000 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloader.java +++ /dev/null @@ -1,153 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.config.proxy.filedistribution; - -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; -import com.yahoo.config.FileReference; -import com.yahoo.log.LogLevel; -import com.yahoo.vespa.config.ConnectionPool; -import com.yahoo.vespa.defaults.Defaults; -import com.yahoo.yolean.Exceptions; - -import java.io.File; -import java.time.Duration; - -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.logging.Logger; - -/** - * Handles downloads of files (file references only for now) - * - * @author hmusum - */ -public class FileDownloader { - - private final static Logger log = Logger.getLogger(FileDownloader.class.getName()); - - private final File downloadDirectory; - private final Duration timeout; - private final FileReferenceDownloader fileReferenceDownloader; - - public FileDownloader(ConnectionPool connectionPool) { - this(connectionPool, - new File(Defaults.getDefaults().underVespaHome("var/db/vespa/filedistribution")), - Duration.ofMinutes(15)); - } - - FileDownloader(ConnectionPool connectionPool, File downloadDirectory, Duration timeout) { - this.downloadDirectory = downloadDirectory; - this.timeout = timeout; - this.fileReferenceDownloader = new FileReferenceDownloader(downloadDirectory, connectionPool, timeout); - } - - public Optional getFile(FileReference fileReference) { - Objects.requireNonNull(fileReference, "file reference cannot be null"); - File directory = new File(downloadDirectory, fileReference.value()); - log.log(LogLevel.DEBUG, "Checking if there is a file in '" + directory.getAbsolutePath() + "' "); - - Optional file = getFileFromFileSystem(fileReference, directory); - if (file.isPresent()) { - return file; - } else { - log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' not found in " + - directory.getAbsolutePath() + ", starting download"); - return queueForDownload(fileReference, timeout); - } - } - - public void queueForDownload(List fileReferences) { - fileReferences.forEach(this::queueForDownload); - } - - public void receiveFile(FileReference fileReference, String filename, byte[] content, long xxHash) { - fileReferenceDownloader.receiveFile(fileReference, filename, content, xxHash); - } - - double downloadStatus(FileReference fileReference) { - return fileReferenceDownloader.downloadStatus(fileReference.value()); - } - - public Map downloadStatus() { - return fileReferenceDownloader.downloadStatus(); - } - - File downloadDirectory() { - return downloadDirectory; - } - - private Optional getFileFromFileSystem(FileReference fileReference, File directory) { - File[] files = directory.listFiles(); - if (directory.exists() && directory.isDirectory() && files != null && files.length > 0) { - if (files.length != 1) { - throw new RuntimeException("More than one file in '" + fileReference.value() + - "', expected only one, unable to proceed"); - } - File file = files[0]; - if (!file.exists()) { - throw new RuntimeException("File with reference '" + fileReference.value() + - "' does not exist"); - } else if (!file.canRead()) { - throw new RuntimeException("File with reference '" + fileReference.value() + - "'exists, but unable to read it"); - } else { - fileReferenceDownloader.setDownloadStatus(fileReference.value(), 100.0); - return Optional.of(file); - } - } - return Optional.empty(); - } - - private synchronized Optional queueForDownload(FileReference fileReference, Duration timeout) { - if (fileReferenceDownloader.isDownloading(fileReference)) { - log.log(LogLevel.INFO, "Already downloading '" + fileReference.value() + "'"); - ListenableFuture> future = - fileReferenceDownloader.addDownloadListener(fileReference, () -> getFile(fileReference)); - try { - return future.get(); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException("Failed downloading file reference '" + fileReference.value() + "': " + - Exceptions.toMessageString(e)); - } - } - - SettableFuture> future = SettableFuture.create(); - queueForDownload(new FileReferenceDownload(fileReference, future)); - log.log(LogLevel.INFO, "Queued '" + fileReference.value() + "' for download with timeout " + timeout); - - try { - Optional fileDownloaded; - try { - log.log(LogLevel.INFO, "Waiting for '" + fileReference.value() + "' to download"); - fileDownloaded = future.get(timeout.getSeconds() - 1, TimeUnit.SECONDS); - log.log(LogLevel.INFO, "'" + fileReference.value() + "' downloaded"); - } catch (TimeoutException e) { - log.log(LogLevel.WARNING, "Downloading '" + fileReference.value() + "' timed out"); - return Optional.empty(); - } - return fileDownloaded; - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException("Could not download '" + fileReference.value() + "'"); - } - } - - // We don't care about the future in this call - private synchronized void queueForDownload(FileReference fileReference) { - queueForDownload(new FileReferenceDownload(fileReference, SettableFuture.create())); - } - - private synchronized void queueForDownload(FileReferenceDownload fileReferenceDownload) { - fileReferenceDownloader.addToDownloadQueue(fileReferenceDownload); - } - - Set queuedDownloads() { - return fileReferenceDownloader.queuedDownloads(); - } - -} diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReceiver.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReceiver.java deleted file mode 100644 index e9631c445df..00000000000 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReceiver.java +++ /dev/null @@ -1,133 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -package com.yahoo.vespa.config.proxy.filedistribution; - -import com.yahoo.config.FileReference; -import com.yahoo.jrt.Int32Value; -import com.yahoo.jrt.Method; -import com.yahoo.jrt.Request; -import com.yahoo.jrt.Supervisor; -import com.yahoo.log.LogLevel; -import net.jpountz.xxhash.XXHash64; -import net.jpountz.xxhash.XXHashFactory; - -import java.io.File; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.file.Files; -import java.util.ArrayList; -import java.util.List; -import java.util.logging.Logger; - -public class FileReceiver { - - private final static Logger log = Logger.getLogger(FileReceiver.class.getName()); - private final static String RECEIVE_METHOD = "filedistribution.receiveFile"; - private final static String RECEIVE_META_METHOD = "filedistribution.receiveFileMeta"; - private final static String RECEIVE_PART_METHOD = "filedistribution.receiveFilePart"; - private final static String RECEIVE_EOF_METHOD = "filedistribution.receiveFileEof"; - - private final Supervisor supervisor; - private final FileReferenceDownloader downloader; - private final File downloadDirectory; - private final XXHash64 hasher = XXHashFactory.fastestInstance().hash64(); - - public FileReceiver(Supervisor supervisor, FileReferenceDownloader downloader, File downloadDirectory) { - this.supervisor = supervisor; - this.downloader = downloader; - this.downloadDirectory = downloadDirectory; - registerMethods(); - } - - private void registerMethods() { - receiveFileMethod(this).forEach((method) -> supervisor.addMethod(method)); - } - - // Defined here so that it can be added to supervisor used by client (server will use same connection when calling - // receiveFile after getting a serveFile method call). handler needs to implement receiveFile method - private List receiveFileMethod(Object handler) { - List methods = new ArrayList<>(); - methods.add(new Method(RECEIVE_META_METHOD, "ssl", "ii", handler,"receiveFileMeta") - .paramDesc(0, "filereference", "file reference to download") - .paramDesc(1, "filename", "filename") - .paramDesc(2, "filelength", "length in bytes of file") - .returnDesc(0, "ret", "0 if success, 1 otherwise") - .returnDesc(1, "session-id", "Session id to be used for this transfer")); - methods.add(new Method(RECEIVE_PART_METHOD, "siix", "i", handler,"receiveFilePart") - .paramDesc(0, "filereference", "file reference to download") - .paramDesc(1, "session-id", "Session id to be used for this transfer") - .paramDesc(2, "partid", "relative part number starting at zero") - .paramDesc(3, "data", "bytes in this part") - .returnDesc(0, "ret", "0 if success, 1 otherwise")); - methods.add(new Method(RECEIVE_EOF_METHOD, "silis", "i", handler,"receiveFileEof") - .paramDesc(0, "filereference", "file reference to download") - .paramDesc(1, "session-id", "Session id to be used for this transfer") - .paramDesc(2, "crc-code", "crc code (xxhash64)") - .paramDesc(3, "error-code", "Error code. 0 if none") - .paramDesc(4, "error-description", "Error description.") - .returnDesc(0, "ret", "0 if success, 1 if crc mismatch, 2 otherwise")); - // Temporary method until we have chunking - methods.add(new Method(RECEIVE_METHOD, "ssxlis", "i", handler, "receiveFile") - .methodDesc("receive file reference content") - .paramDesc(0, "file reference", "file reference to download") - .paramDesc(1, "filename", "filename") - .paramDesc(2, "content", "array of bytes") - .paramDesc(3, "hash", "xx64hash of the file content") - .paramDesc(4, "errorcode", "Error code. 0 if none") - .paramDesc(5, "error-description", "Error description.") - .returnDesc(0, "ret", "0 if success, 1 otherwise")); - return methods; - } - - @SuppressWarnings({"UnusedDeclaration"}) - public final void receiveFile(Request req) { - FileReference fileReference = new FileReference(req.parameters().get(0).asString()); - String filename = req.parameters().get(1).asString(); - byte[] content = req.parameters().get(2).asData(); - long xxhash = req.parameters().get(3).asInt64(); - int errorCode = req.parameters().get(4).asInt32(); - String errorDescription = req.parameters().get(5).asString(); - - if (errorCode == 0) { - // TODO: Remove when system test works - log.log(LogLevel.INFO, "Receiving file reference '" + fileReference.value() + "'"); - receiveFile(fileReference, filename, content, xxhash); - req.returnValues().add(new Int32Value(0)); - } else { - log.log(LogLevel.WARNING, "Receiving file reference '" + fileReference.value() + "' failed: " + errorDescription); - req.returnValues().add(new Int32Value(1)); - // TODO: Add error description return value here too? - } - } - - void receiveFile(FileReference fileReference, String filename, byte[] content, long xxHash) { - long xxHashFromContent = hasher.hash(ByteBuffer.wrap(content), 0); - if (xxHashFromContent != xxHash) - throw new RuntimeException("xxhash from content (" + xxHashFromContent + ") is not equal to xxhash in request (" + xxHash + ")"); - - File fileReferenceDir = new File(downloadDirectory, fileReference.value()); - try { - Files.createDirectories(fileReferenceDir.toPath()); - File file = new File(fileReferenceDir, filename); - log.log(LogLevel.INFO, "Writing data to " + file.getAbsolutePath()); - Files.write(file.toPath(), content); - downloader.completedDownloading(fileReference, file); - } catch (IOException e) { - log.log(LogLevel.ERROR, "Failed writing file: " + e.getMessage()); - throw new RuntimeException("Failed writing file: ", e); - } - } - - @SuppressWarnings({"UnusedDeclaration"}) - public final void receiveFileMeta(Request req) { - log.info("Received method call '" + req.methodName() + "' with parameters : " + req.parameters()); - } - @SuppressWarnings({"UnusedDeclaration"}) - public final void receiveFilePart(Request req) { - log.info("Received method call '" + req.methodName() + "' with parameters : " + req.parameters()); - } - @SuppressWarnings({"UnusedDeclaration"}) - public final void receiveFileEof(Request req) { - log.info("Received method call '" + req.methodName() + "' with parameters : " + req.parameters()); - } -} diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownload.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownload.java deleted file mode 100644 index ce5a30dc7ad..00000000000 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownload.java +++ /dev/null @@ -1,28 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -package com.yahoo.vespa.config.proxy.filedistribution; - -import com.google.common.util.concurrent.SettableFuture; -import com.yahoo.config.FileReference; - -import java.io.File; -import java.util.Optional; - -public class FileReferenceDownload { - private final FileReference fileReference; - private final SettableFuture> future; - - FileReferenceDownload(FileReference fileReference, SettableFuture> future) { - this.fileReference = fileReference; - this.future = future; - } - - FileReference fileReference() { - return fileReference; - } - - SettableFuture> future() { - return future; - } - -} diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java deleted file mode 100644 index c972cfbbf56..00000000000 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java +++ /dev/null @@ -1,172 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.config.proxy.filedistribution; - -import com.google.common.collect.ImmutableMap; -import com.google.common.util.concurrent.ListenableFuture; -import com.yahoo.concurrent.DaemonThreadFactory; -import com.yahoo.config.FileReference; -import com.yahoo.jrt.Request; -import com.yahoo.jrt.StringValue; -import com.yahoo.log.LogLevel; -import com.yahoo.vespa.config.Connection; -import com.yahoo.vespa.config.ConnectionPool; - -import java.io.File; -import java.time.Duration; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.logging.Logger; -import java.util.stream.Collectors; - -/** - * Downloads file reference using rpc requests to config server and keeps track of files being downloaded - *

- * Some methods are synchronized to make sure access to downloads is atomic - * - * @author hmusum - */ -// TODO: Add retries when a config server does not have a file reference -// TODO: Handle shutdown of executors -class FileReferenceDownloader { - - private final static Logger log = Logger.getLogger(FileReferenceDownloader.class.getName()); - private final static Duration rpcTimeout = Duration.ofSeconds(10); - - private final ExecutorService downloadExecutor = - Executors.newFixedThreadPool(10, new DaemonThreadFactory("filereference downloader")); - private ExecutorService readFromQueueExecutor = - Executors.newFixedThreadPool(1, new DaemonThreadFactory("filereference download queue")); - private final ConnectionPool connectionPool; - private final ConcurrentLinkedQueue downloadQueue = new ConcurrentLinkedQueue<>(); - private final Map downloads = new LinkedHashMap<>(); - private final Map downloadStatus = new HashMap<>(); - private final Duration downloadTimeout; - private final FileReceiver fileReceiver; - - FileReferenceDownloader(File downloadDirectory, ConnectionPool connectionPool, Duration timeout) { - this.connectionPool = connectionPool; - this.downloadTimeout = timeout; - readFromQueueExecutor.submit(this::readFromQueue); - this.fileReceiver = new FileReceiver(connectionPool.getSupervisor(), this, downloadDirectory); - } - - private synchronized Optional startDownload(FileReference fileReference, - Duration timeout, - FileReferenceDownload fileReferenceDownload) - throws ExecutionException, InterruptedException, TimeoutException { - downloads.put(fileReference, fileReferenceDownload); - setDownloadStatus(fileReference.value(), 0.0); - if (startDownloadRpc(fileReference)) - return fileReferenceDownload.future().get(timeout.toMillis(), TimeUnit.MILLISECONDS); - else { - fileReferenceDownload.future().setException(new RuntimeException("Failed getting file")); - downloads.remove(fileReference); - return Optional.empty(); - } - } - - synchronized void addToDownloadQueue(FileReferenceDownload fileReferenceDownload) { - downloadQueue.add(fileReferenceDownload); - } - - void receiveFile(FileReference fileReference, String filename, byte[] content, long xxHash) { - fileReceiver.receiveFile(fileReference, filename, content, xxHash); - } - - synchronized Set queuedDownloads() { - return downloadQueue.stream() - .map(FileReferenceDownload::fileReference) - .collect(Collectors.toCollection(LinkedHashSet::new)); - } - - private void readFromQueue() { - do { - FileReferenceDownload fileReferenceDownload = downloadQueue.poll(); - if (fileReferenceDownload == null) { - try { - Thread.sleep(10); - } catch (InterruptedException e) { /* ignore for now */} - } else { - log.log(LogLevel.INFO, "Polling queue, found file reference '" + - fileReferenceDownload.fileReference().value() + "' to download"); - downloadExecutor.submit(() -> startDownload(fileReferenceDownload.fileReference(), downloadTimeout, fileReferenceDownload)); - } - } while (true); - } - - void completedDownloading(FileReference fileReference, File file) { - if (downloads.containsKey(fileReference)) - downloads.get(fileReference).future().set(Optional.of(file)); - downloadStatus.put(fileReference, 100.0); - } - - private boolean startDownloadRpc(FileReference fileReference) throws ExecutionException, InterruptedException { - Connection connection = connectionPool.getCurrent(); - Request request = new Request("filedistribution.serveFile"); - request.parameters().add(new StringValue(fileReference.value())); - - execute(request, connection); - if (validateResponse(request)) { - log.log(LogLevel.DEBUG, "Request callback, OK. Req: " + request + "\nSpec: " + connection); - if (request.returnValues().get(0).asInt32() == 0) - log.log(LogLevel.INFO, "Found file reference '" + fileReference.value() + "' available at " + connection.getAddress()); - else - log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' not found for " + connection.getAddress()); - return true; - } else { - log.log(LogLevel.WARNING, "Request failed. Req: " + request + "\nSpec: " + connection.getAddress()); - connection.setError(request.errorCode()); - // TODO: Retry with another config server - return false; - } - } - - synchronized boolean isDownloading(FileReference fileReference) { - return downloads.containsKey(fileReference); - } - - synchronized ListenableFuture> addDownloadListener(FileReference fileReference, Runnable runnable) { - FileReferenceDownload fileReferenceDownload = downloads.get(fileReference); - fileReferenceDownload.future().addListener(runnable, downloadExecutor); - return fileReferenceDownload.future(); - } - - private void execute(Request request, Connection connection) { - connection.invokeSync(request, (double) rpcTimeout.getSeconds()); - } - - private boolean validateResponse(Request request) { - if (request.isError()) { - return false; - } else if (request.returnValues().size() == 0) { - return false; - } else if (!request.checkReturnTypes("is")) { // TODO: Do not hard-code return type - log.log(LogLevel.WARNING, "Invalid return types for response: " + request.errorMessage()); - return false; - } - return true; - } - - double downloadStatus(String file) { - return downloadStatus.getOrDefault(new FileReference(file), 0.0); - } - - void setDownloadStatus(String file, double percentageDownloaded) { - downloadStatus.put(new FileReference(file), percentageDownloaded); - } - - Map downloadStatus() { - return ImmutableMap.copyOf(downloadStatus); - } - -} diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java deleted file mode 100644 index d1b691b9d5e..00000000000 --- a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java +++ /dev/null @@ -1,277 +0,0 @@ -package com.yahoo.vespa.config.proxy.filedistribution; - -import com.yahoo.config.FileReference; -import com.yahoo.io.IOUtils; -import com.yahoo.jrt.Int32Value; -import com.yahoo.jrt.Request; -import com.yahoo.jrt.RequestWaiter; -import com.yahoo.jrt.StringValue; -import com.yahoo.jrt.Supervisor; -import com.yahoo.jrt.Transport; -import com.yahoo.text.Utf8; -import com.yahoo.vespa.config.Connection; -import com.yahoo.vespa.config.ConnectionPool; -import net.jpountz.xxhash.XXHash64; -import net.jpountz.xxhash.XXHashFactory; -import org.junit.Before; -import org.junit.Test; - -import java.io.File; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.file.Files; -import java.time.Duration; -import java.util.Arrays; -import java.util.List; -import java.util.Optional; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -public class FileDownloaderTest { - - private final XXHash64 hasher = XXHashFactory.fastestInstance().hash64(); - - private MockConnection connection; - private FileDownloader fileDownloader; - private File downloadDir; - - @Before - public void setup() { - try { - downloadDir = Files.createTempDirectory("filedistribution").toFile(); - connection = new MockConnection(); - fileDownloader = new FileDownloader(connection, downloadDir, Duration.ofMillis(3000)); - } catch (IOException e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void getFile() throws IOException { - File downloadDir = fileDownloader.downloadDirectory(); - - { - // fileReference already exists on disk, does not have to be downloaded - - String fileReferenceString = "foo"; - String filename = "foo.jar"; - FileReference fileReference = new FileReference(fileReferenceString); - File fileReferenceFullPath = fileReferenceFullPath(downloadDir, fileReference); - writeFileReference(downloadDir, fileReferenceString, filename); - - // Check that we get correct path and content when asking for file reference - Optional pathToFile = fileDownloader.getFile(fileReference); - assertTrue(pathToFile.isPresent()); - String downloadedFile = new File(fileReferenceFullPath, filename).getAbsolutePath(); - assertEquals(new File(fileReferenceFullPath, filename).getAbsolutePath(), downloadedFile); - assertEquals("content", IOUtils.readFile(pathToFile.get())); - - // Verify download status when downloaded - assertDownloadStatus(fileDownloader, fileReference, 100.0); - } - - { - // fileReference does not exist on disk, needs to be downloaded, but fails when asking upstream for file) - - connection.setResponseHandler(new MockConnection.UnknownFileReferenceResponseHandler()); - - FileReference fileReference = new FileReference("bar"); - File fileReferenceFullPath = fileReferenceFullPath(downloadDir, fileReference); - assertFalse(fileReferenceFullPath.getAbsolutePath(), fileDownloader.getFile(fileReference).isPresent()); - - // Verify download status when unable to download - assertDownloadStatus(fileDownloader, fileReference, 0.0); - } - - { - // fileReference does not exist on disk, needs to be downloaded) - - FileReference fileReference = new FileReference("fileReference"); - File fileReferenceFullPath = fileReferenceFullPath(downloadDir, fileReference); - assertFalse(fileReferenceFullPath.getAbsolutePath(), fileDownloader.getFile(fileReference).isPresent()); - - // Verify download status - assertDownloadStatus(fileDownloader, fileReference, 0.0); - - // Receives fileReference, should return and make it available to caller - String filename = "abc.jar"; - receiveFile(fileReference, filename, "some other content"); - Optional downloadedFile = fileDownloader.getFile(fileReference); - - assertTrue(downloadedFile.isPresent()); - File downloadedFileFullPath = new File(fileReferenceFullPath, filename); - assertEquals(downloadedFileFullPath.getAbsolutePath(), downloadedFile.get().getAbsolutePath()); - assertEquals("some other content", IOUtils.readFile(downloadedFile.get())); - - // Verify download status when downloaded - assertDownloadStatus(fileDownloader, fileReference, 100.0); - } - } - - @Test - public void setFilesToDownload() throws IOException { - Duration timeout = Duration.ofMillis(200); - File downloadDir = Files.createTempDirectory("filedistribution").toFile(); - MockConnection connectionPool = new MockConnection(); - connectionPool.setResponseHandler(new MockConnection.WaitResponseHandler(timeout.plus(Duration.ofMillis(1000)))); - FileDownloader fileDownloader = new FileDownloader(connectionPool, downloadDir, timeout); - FileReference foo = new FileReference("foo"); - FileReference bar = new FileReference("bar"); - List fileReferences = Arrays.asList(foo, bar); - fileDownloader.queueForDownload(fileReferences); - - // Verify download status - assertDownloadStatus(fileDownloader, foo, 0.0); - assertDownloadStatus(fileDownloader, bar, 0.0); - } - - @Test - public void receiveFile() throws IOException { - FileReference foo = new FileReference("foo"); - String filename = "foo.jar"; - receiveFile(foo, filename, "content"); - File downloadedFile = new File(fileReferenceFullPath(downloadDir, foo), filename); - assertEquals("content", IOUtils.readFile(downloadedFile)); - } - - private void writeFileReference(File dir, String fileReferenceString, String fileName) throws IOException { - File file = new File(new File(dir, fileReferenceString), fileName); - IOUtils.writeFile(file, "content", false); - } - - private File fileReferenceFullPath(File dir, FileReference fileReference) { - return new File(dir, fileReference.value()); - } - - private void assertDownloadStatus(FileDownloader fileDownloader, FileReference fileReference, double expectedDownloadStatus) { - double downloadStatus = fileDownloader.downloadStatus(fileReference); - assertEquals(expectedDownloadStatus, downloadStatus, 0.0001); - } - - private void receiveFile(FileReference fileReference, String filename, String content) { - byte[] contentBytes = Utf8.toBytes(content); - long xxHashFromContent = hasher.hash(ByteBuffer.wrap(contentBytes), 0); - fileDownloader.receiveFile(fileReference, filename, contentBytes, xxHashFromContent); - } - - private static class MockConnection implements ConnectionPool, com.yahoo.vespa.config.Connection { - - private ResponseHandler responseHandler; - - MockConnection() { - this(new FileReferenceFoundResponseHandler()); - } - - MockConnection(ResponseHandler responseHandler) { - this.responseHandler = responseHandler; - } - - @Override - public void invokeAsync(Request request, double jrtTimeout, RequestWaiter requestWaiter) { - responseHandler.request(request); - } - - @Override - public void invokeSync(Request request, double jrtTimeout) { - responseHandler.request(request); - } - - @Override - public void setError(int errorCode) { - } - - @Override - public void setSuccess() { - } - - @Override - public String getAddress() { - return null; - } - - @Override - public void close() { - } - - @Override - public void setError(Connection connection, int errorCode) { - connection.setError(errorCode); - } - - @Override - public Connection getCurrent() { - return this; - } - - @Override - public Connection setNewCurrentConnection() { - return this; - } - - @Override - public int getSize() { - return 1; - } - - @Override - public Supervisor getSupervisor() { - return new Supervisor(new Transport()); - } - - void setResponseHandler(ResponseHandler responseHandler) { - this.responseHandler = responseHandler; - } - - public interface ResponseHandler { - void request(Request request); - } - - static class FileReferenceFoundResponseHandler implements MockConnection.ResponseHandler { - - @Override - public void request(Request request) { - if (request.methodName().equals("filedistribution.serveFile")) { - request.returnValues().add(new Int32Value(0)); - request.returnValues().add(new StringValue("OK")); - } - } - } - - static class UnknownFileReferenceResponseHandler implements MockConnection.ResponseHandler { - - @Override - public void request(Request request) { - if (request.methodName().equals("filedistribution.serveFile")) { - request.returnValues().add(new Int32Value(1)); - request.returnValues().add(new StringValue("Internal error")); - } - } - } - - static class WaitResponseHandler implements MockConnection.ResponseHandler { - - private final Duration waitUntilAnswering; - - WaitResponseHandler(Duration waitUntilAnswering) { - super(); - this.waitUntilAnswering = waitUntilAnswering; - } - - @Override - public void request(Request request) { - try { Thread.sleep(waitUntilAnswering.toMillis());} catch (InterruptedException e) { /* do nothing */ } - - if (request.methodName().equals("filedistribution.serveFile")) { - request.returnValues().add(new Int32Value(0)); - request.returnValues().add(new StringValue("OK")); - } - - } - } - } - -} -- cgit v1.2.3