diff options
author | Harald Musum <musum@yahoo-inc.com> | 2017-11-16 22:44:40 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-11-16 22:44:40 +0100 |
commit | b0fd50fc89525b272bd3694594593a288abcc790 (patch) | |
tree | 797fc2b68976a28ff60e506dae31c9c574d209ca /config-proxy | |
parent | 2f423237545ef76a4527804a78fb9a6f48ec5ef4 (diff) |
Revert "Revert "Revert "Hmusum/move new filedistribution classes"""
Diffstat (limited to 'config-proxy')
8 files changed, 896 insertions, 11 deletions
diff --git a/config-proxy/pom.xml b/config-proxy/pom.xml index 13e2ddf40e8..8a7b6f253e2 100644 --- a/config-proxy/pom.xml +++ b/config-proxy/pom.xml @@ -57,15 +57,10 @@ <artifactId>hamcrest-core</artifactId> <scope>test</scope> </dependency> - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - </dependency> - <dependency> - <groupId>com.yahoo.vespa</groupId> - <artifactId>filedistribution</artifactId> - <version>${project.version}</version> - </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> </dependencies> <build> <plugins> diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java index 28bcca9db13..173d2b8a43a 100644 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java @@ -13,8 +13,8 @@ import com.yahoo.log.event.Event; import com.yahoo.system.CatchSigTerm; import com.yahoo.vespa.config.*; import com.yahoo.vespa.config.protocol.JRTServerConfigRequest; -import com.yahoo.vespa.filedistribution.FileDistributionRpcServer; -import com.yahoo.vespa.filedistribution.FileDownloader; +import com.yahoo.vespa.config.proxy.filedistribution.FileDistributionRpcServer; +import com.yahoo.vespa.config.proxy.filedistribution.FileDownloader; import java.util.List; import java.util.concurrent.Executors; diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionRpcServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionRpcServer.java new file mode 100644 index 00000000000..b0fbc7acd33 --- /dev/null +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionRpcServer.java @@ -0,0 +1,127 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.config.proxy.filedistribution; + +import com.yahoo.config.FileReference; +import com.yahoo.jrt.DoubleArray; +import com.yahoo.jrt.Int32Value; +import com.yahoo.jrt.Method; +import com.yahoo.jrt.Request; +import com.yahoo.jrt.StringArray; +import com.yahoo.jrt.StringValue; +import com.yahoo.jrt.Supervisor; +import com.yahoo.log.LogLevel; + +import java.io.File; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.logging.Logger; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * An RPC server that handles file distribution requests. + * + * @author hmusum + */ +public class FileDistributionRpcServer { + + private final static Logger log = Logger.getLogger(FileDistributionRpcServer.class.getName()); + + private final Supervisor supervisor; + private final FileDownloader downloader; + + public FileDistributionRpcServer(Supervisor supervisor, FileDownloader downloader) { + this.supervisor = supervisor; + this.downloader = downloader; + declareFileDistributionMethods(); + } + + private void declareFileDistributionMethods() { + // Legacy method, needs to be the same name as used in filedistributor + supervisor.addMethod(new Method("waitFor", "s", "s", + this, "getFile") + .methodDesc("get path to file reference") + .paramDesc(0, "file reference", "file reference") + .returnDesc(0, "path", "path to file")); + supervisor.addMethod(new Method("filedistribution.getFile", "s", "s", + this, "getFile") + .methodDesc("get path to file reference") + .paramDesc(0, "file reference", "file reference") + .returnDesc(0, "path", "path to file")); + supervisor.addMethod(new Method("filedistribution.getActiveFileReferencesStatus", "", "SD", + this, "getActiveFileReferencesStatus") + .methodDesc("download status for file references") + .returnDesc(0, "file references", "array of file references") + .returnDesc(1, "download status", "percentage downloaded of each file reference in above array")); + supervisor.addMethod(new Method("filedistribution.setFileReferencesToDownload", "S", "i", + this, "setFileReferencesToDownload") + .methodDesc("set which file references to download") + .paramDesc(0, "file references", "file reference to download") + .returnDesc(0, "ret", "0 if success, 1 otherwise")); + } + + + //---------------- RPC methods ------------------------------------ + // TODO: Duplicate of code in FileAcquirereImpl. Find out where to put it. What about C++ code using this RPC call? + private static final int baseErrorCode = 0x10000; + private static final int baseFileProviderErrorCode = baseErrorCode + 0x1000; + + private static final int fileReferenceDoesNotExists = baseFileProviderErrorCode; + private static final int fileReferenceRemoved = fileReferenceDoesNotExists + 1; + private static final int fileReferenceInternalError = fileReferenceRemoved + 1; + + @SuppressWarnings({"UnusedDeclaration"}) + public final void getFile(Request req) { + req.detach(); + FileReference fileReference = new FileReference(req.parameters().get(0).asString()); + log.log(LogLevel.DEBUG, "getFile() called for file reference '" + fileReference.value() + "'"); + Optional<File> pathToFile = downloader.getFile(fileReference); + try { + if (pathToFile.isPresent()) { + req.returnValues().add(new StringValue(pathToFile.get().getAbsolutePath())); + log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' available at " + pathToFile.get()); + } else { + log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' not found, returning error"); + req.setError(fileReferenceDoesNotExists, "File reference '" + fileReference.value() + "' not found"); + } + } catch (Throwable e) { + log.log(LogLevel.WARNING, "File reference '" + fileReference.value() + "' got exeption: " + e.getMessage()); + req.setError(fileReferenceInternalError, "File reference '" + fileReference.value() + "' removed"); + } + req.returnRequest(); + } + + @SuppressWarnings({"UnusedDeclaration"}) + public final void getActiveFileReferencesStatus(Request req) { + Map<FileReference, Double> downloadStatus = downloader.downloadStatus(); + + String[] fileRefArray = new String[downloadStatus.keySet().size()]; + fileRefArray = downloadStatus.keySet().stream() + .map(FileReference::value) + .collect(Collectors.toList()) + .toArray(fileRefArray); + + double[] downloadStatusArray = new double[downloadStatus.values().size()]; + int i = 0; + for (Double d : downloadStatus.values()) { + downloadStatusArray[i++] = d; + } + + req.returnValues().add(new StringArray(fileRefArray)); + req.returnValues().add(new DoubleArray(downloadStatusArray)); + } + + @SuppressWarnings({"UnusedDeclaration"}) + public final void setFileReferencesToDownload(Request req) { + String[] fileReferenceStrings = req.parameters().get(0).asStringArray(); + List<FileReference> fileReferences = Stream.of(fileReferenceStrings) + .map(FileReference::new) + .collect(Collectors.toList()); + downloader.queueForDownload(fileReferences); + + req.returnValues().add(new Int32Value(0)); + } + + +} diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloader.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloader.java new file mode 100644 index 00000000000..ac7555c7905 --- /dev/null +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloader.java @@ -0,0 +1,153 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.config.proxy.filedistribution; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import com.yahoo.config.FileReference; +import com.yahoo.log.LogLevel; +import com.yahoo.vespa.config.ConnectionPool; +import com.yahoo.vespa.defaults.Defaults; +import com.yahoo.yolean.Exceptions; + +import java.io.File; +import java.time.Duration; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.logging.Logger; + +/** + * Handles downloads of files (file references only for now) + * + * @author hmusum + */ +public class FileDownloader { + + private final static Logger log = Logger.getLogger(FileDownloader.class.getName()); + + private final File downloadDirectory; + private final Duration timeout; + private final FileReferenceDownloader fileReferenceDownloader; + + public FileDownloader(ConnectionPool connectionPool) { + this(connectionPool, + new File(Defaults.getDefaults().underVespaHome("var/db/vespa/filedistribution")), + Duration.ofMinutes(15)); + } + + FileDownloader(ConnectionPool connectionPool, File downloadDirectory, Duration timeout) { + this.downloadDirectory = downloadDirectory; + this.timeout = timeout; + this.fileReferenceDownloader = new FileReferenceDownloader(downloadDirectory, connectionPool, timeout); + } + + public Optional<File> getFile(FileReference fileReference) { + Objects.requireNonNull(fileReference, "file reference cannot be null"); + File directory = new File(downloadDirectory, fileReference.value()); + log.log(LogLevel.DEBUG, "Checking if there is a file in '" + directory.getAbsolutePath() + "' "); + + Optional<File> file = getFileFromFileSystem(fileReference, directory); + if (file.isPresent()) { + return file; + } else { + log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' not found in " + + directory.getAbsolutePath() + ", starting download"); + return queueForDownload(fileReference, timeout); + } + } + + public void queueForDownload(List<FileReference> fileReferences) { + fileReferences.forEach(this::queueForDownload); + } + + public void receiveFile(FileReference fileReference, String filename, byte[] content, long xxHash) { + fileReferenceDownloader.receiveFile(fileReference, filename, content, xxHash); + } + + double downloadStatus(FileReference fileReference) { + return fileReferenceDownloader.downloadStatus(fileReference.value()); + } + + public Map<FileReference, Double> downloadStatus() { + return fileReferenceDownloader.downloadStatus(); + } + + File downloadDirectory() { + return downloadDirectory; + } + + private Optional<File> getFileFromFileSystem(FileReference fileReference, File directory) { + File[] files = directory.listFiles(); + if (directory.exists() && directory.isDirectory() && files != null && files.length > 0) { + if (files.length != 1) { + throw new RuntimeException("More than one file in '" + fileReference.value() + + "', expected only one, unable to proceed"); + } + File file = files[0]; + if (!file.exists()) { + throw new RuntimeException("File with reference '" + fileReference.value() + + "' does not exist"); + } else if (!file.canRead()) { + throw new RuntimeException("File with reference '" + fileReference.value() + + "'exists, but unable to read it"); + } else { + fileReferenceDownloader.setDownloadStatus(fileReference.value(), 100.0); + return Optional.of(file); + } + } + return Optional.empty(); + } + + private synchronized Optional<File> queueForDownload(FileReference fileReference, Duration timeout) { + if (fileReferenceDownloader.isDownloading(fileReference)) { + log.log(LogLevel.INFO, "Already downloading '" + fileReference.value() + "'"); + ListenableFuture<Optional<File>> future = + fileReferenceDownloader.addDownloadListener(fileReference, () -> getFile(fileReference)); + try { + return future.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException("Failed downloading file reference '" + fileReference.value() + "': " + + Exceptions.toMessageString(e)); + } + } + + SettableFuture<Optional<File>> future = SettableFuture.create(); + queueForDownload(new FileReferenceDownload(fileReference, future)); + log.log(LogLevel.INFO, "Queued '" + fileReference.value() + "' for download with timeout " + timeout); + + try { + Optional<File> fileDownloaded; + try { + log.log(LogLevel.INFO, "Waiting for '" + fileReference.value() + "' to download"); + fileDownloaded = future.get(timeout.getSeconds() - 1, TimeUnit.SECONDS); + log.log(LogLevel.INFO, "'" + fileReference.value() + "' downloaded"); + } catch (TimeoutException e) { + log.log(LogLevel.WARNING, "Downloading '" + fileReference.value() + "' timed out"); + return Optional.empty(); + } + return fileDownloaded; + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException("Could not download '" + fileReference.value() + "'"); + } + } + + // We don't care about the future in this call + private synchronized void queueForDownload(FileReference fileReference) { + queueForDownload(new FileReferenceDownload(fileReference, SettableFuture.create())); + } + + private synchronized void queueForDownload(FileReferenceDownload fileReferenceDownload) { + fileReferenceDownloader.addToDownloadQueue(fileReferenceDownload); + } + + Set<FileReference> queuedDownloads() { + return fileReferenceDownloader.queuedDownloads(); + } + +} diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReceiver.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReceiver.java new file mode 100644 index 00000000000..e9631c445df --- /dev/null +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReceiver.java @@ -0,0 +1,133 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +package com.yahoo.vespa.config.proxy.filedistribution; + +import com.yahoo.config.FileReference; +import com.yahoo.jrt.Int32Value; +import com.yahoo.jrt.Method; +import com.yahoo.jrt.Request; +import com.yahoo.jrt.Supervisor; +import com.yahoo.log.LogLevel; +import net.jpountz.xxhash.XXHash64; +import net.jpountz.xxhash.XXHashFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.List; +import java.util.logging.Logger; + +public class FileReceiver { + + private final static Logger log = Logger.getLogger(FileReceiver.class.getName()); + private final static String RECEIVE_METHOD = "filedistribution.receiveFile"; + private final static String RECEIVE_META_METHOD = "filedistribution.receiveFileMeta"; + private final static String RECEIVE_PART_METHOD = "filedistribution.receiveFilePart"; + private final static String RECEIVE_EOF_METHOD = "filedistribution.receiveFileEof"; + + private final Supervisor supervisor; + private final FileReferenceDownloader downloader; + private final File downloadDirectory; + private final XXHash64 hasher = XXHashFactory.fastestInstance().hash64(); + + public FileReceiver(Supervisor supervisor, FileReferenceDownloader downloader, File downloadDirectory) { + this.supervisor = supervisor; + this.downloader = downloader; + this.downloadDirectory = downloadDirectory; + registerMethods(); + } + + private void registerMethods() { + receiveFileMethod(this).forEach((method) -> supervisor.addMethod(method)); + } + + // Defined here so that it can be added to supervisor used by client (server will use same connection when calling + // receiveFile after getting a serveFile method call). handler needs to implement receiveFile method + private List<Method> receiveFileMethod(Object handler) { + List<Method> methods = new ArrayList<>(); + methods.add(new Method(RECEIVE_META_METHOD, "ssl", "ii", handler,"receiveFileMeta") + .paramDesc(0, "filereference", "file reference to download") + .paramDesc(1, "filename", "filename") + .paramDesc(2, "filelength", "length in bytes of file") + .returnDesc(0, "ret", "0 if success, 1 otherwise") + .returnDesc(1, "session-id", "Session id to be used for this transfer")); + methods.add(new Method(RECEIVE_PART_METHOD, "siix", "i", handler,"receiveFilePart") + .paramDesc(0, "filereference", "file reference to download") + .paramDesc(1, "session-id", "Session id to be used for this transfer") + .paramDesc(2, "partid", "relative part number starting at zero") + .paramDesc(3, "data", "bytes in this part") + .returnDesc(0, "ret", "0 if success, 1 otherwise")); + methods.add(new Method(RECEIVE_EOF_METHOD, "silis", "i", handler,"receiveFileEof") + .paramDesc(0, "filereference", "file reference to download") + .paramDesc(1, "session-id", "Session id to be used for this transfer") + .paramDesc(2, "crc-code", "crc code (xxhash64)") + .paramDesc(3, "error-code", "Error code. 0 if none") + .paramDesc(4, "error-description", "Error description.") + .returnDesc(0, "ret", "0 if success, 1 if crc mismatch, 2 otherwise")); + // Temporary method until we have chunking + methods.add(new Method(RECEIVE_METHOD, "ssxlis", "i", handler, "receiveFile") + .methodDesc("receive file reference content") + .paramDesc(0, "file reference", "file reference to download") + .paramDesc(1, "filename", "filename") + .paramDesc(2, "content", "array of bytes") + .paramDesc(3, "hash", "xx64hash of the file content") + .paramDesc(4, "errorcode", "Error code. 0 if none") + .paramDesc(5, "error-description", "Error description.") + .returnDesc(0, "ret", "0 if success, 1 otherwise")); + return methods; + } + + @SuppressWarnings({"UnusedDeclaration"}) + public final void receiveFile(Request req) { + FileReference fileReference = new FileReference(req.parameters().get(0).asString()); + String filename = req.parameters().get(1).asString(); + byte[] content = req.parameters().get(2).asData(); + long xxhash = req.parameters().get(3).asInt64(); + int errorCode = req.parameters().get(4).asInt32(); + String errorDescription = req.parameters().get(5).asString(); + + if (errorCode == 0) { + // TODO: Remove when system test works + log.log(LogLevel.INFO, "Receiving file reference '" + fileReference.value() + "'"); + receiveFile(fileReference, filename, content, xxhash); + req.returnValues().add(new Int32Value(0)); + } else { + log.log(LogLevel.WARNING, "Receiving file reference '" + fileReference.value() + "' failed: " + errorDescription); + req.returnValues().add(new Int32Value(1)); + // TODO: Add error description return value here too? + } + } + + void receiveFile(FileReference fileReference, String filename, byte[] content, long xxHash) { + long xxHashFromContent = hasher.hash(ByteBuffer.wrap(content), 0); + if (xxHashFromContent != xxHash) + throw new RuntimeException("xxhash from content (" + xxHashFromContent + ") is not equal to xxhash in request (" + xxHash + ")"); + + File fileReferenceDir = new File(downloadDirectory, fileReference.value()); + try { + Files.createDirectories(fileReferenceDir.toPath()); + File file = new File(fileReferenceDir, filename); + log.log(LogLevel.INFO, "Writing data to " + file.getAbsolutePath()); + Files.write(file.toPath(), content); + downloader.completedDownloading(fileReference, file); + } catch (IOException e) { + log.log(LogLevel.ERROR, "Failed writing file: " + e.getMessage()); + throw new RuntimeException("Failed writing file: ", e); + } + } + + @SuppressWarnings({"UnusedDeclaration"}) + public final void receiveFileMeta(Request req) { + log.info("Received method call '" + req.methodName() + "' with parameters : " + req.parameters()); + } + @SuppressWarnings({"UnusedDeclaration"}) + public final void receiveFilePart(Request req) { + log.info("Received method call '" + req.methodName() + "' with parameters : " + req.parameters()); + } + @SuppressWarnings({"UnusedDeclaration"}) + public final void receiveFileEof(Request req) { + log.info("Received method call '" + req.methodName() + "' with parameters : " + req.parameters()); + } +} diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownload.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownload.java new file mode 100644 index 00000000000..ce5a30dc7ad --- /dev/null +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownload.java @@ -0,0 +1,28 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +package com.yahoo.vespa.config.proxy.filedistribution; + +import com.google.common.util.concurrent.SettableFuture; +import com.yahoo.config.FileReference; + +import java.io.File; +import java.util.Optional; + +public class FileReferenceDownload { + private final FileReference fileReference; + private final SettableFuture<Optional<File>> future; + + FileReferenceDownload(FileReference fileReference, SettableFuture<Optional<File>> future) { + this.fileReference = fileReference; + this.future = future; + } + + FileReference fileReference() { + return fileReference; + } + + SettableFuture<Optional<File>> future() { + return future; + } + +} diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java new file mode 100644 index 00000000000..c972cfbbf56 --- /dev/null +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java @@ -0,0 +1,172 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.config.proxy.filedistribution; + +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.ListenableFuture; +import com.yahoo.concurrent.DaemonThreadFactory; +import com.yahoo.config.FileReference; +import com.yahoo.jrt.Request; +import com.yahoo.jrt.StringValue; +import com.yahoo.log.LogLevel; +import com.yahoo.vespa.config.Connection; +import com.yahoo.vespa.config.ConnectionPool; + +import java.io.File; +import java.time.Duration; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.logging.Logger; +import java.util.stream.Collectors; + +/** + * Downloads file reference using rpc requests to config server and keeps track of files being downloaded + * <p> + * Some methods are synchronized to make sure access to downloads is atomic + * + * @author hmusum + */ +// TODO: Add retries when a config server does not have a file reference +// TODO: Handle shutdown of executors +class FileReferenceDownloader { + + private final static Logger log = Logger.getLogger(FileReferenceDownloader.class.getName()); + private final static Duration rpcTimeout = Duration.ofSeconds(10); + + private final ExecutorService downloadExecutor = + Executors.newFixedThreadPool(10, new DaemonThreadFactory("filereference downloader")); + private ExecutorService readFromQueueExecutor = + Executors.newFixedThreadPool(1, new DaemonThreadFactory("filereference download queue")); + private final ConnectionPool connectionPool; + private final ConcurrentLinkedQueue<FileReferenceDownload> downloadQueue = new ConcurrentLinkedQueue<>(); + private final Map<FileReference, FileReferenceDownload> downloads = new LinkedHashMap<>(); + private final Map<FileReference, Double> downloadStatus = new HashMap<>(); + private final Duration downloadTimeout; + private final FileReceiver fileReceiver; + + FileReferenceDownloader(File downloadDirectory, ConnectionPool connectionPool, Duration timeout) { + this.connectionPool = connectionPool; + this.downloadTimeout = timeout; + readFromQueueExecutor.submit(this::readFromQueue); + this.fileReceiver = new FileReceiver(connectionPool.getSupervisor(), this, downloadDirectory); + } + + private synchronized Optional<File> startDownload(FileReference fileReference, + Duration timeout, + FileReferenceDownload fileReferenceDownload) + throws ExecutionException, InterruptedException, TimeoutException { + downloads.put(fileReference, fileReferenceDownload); + setDownloadStatus(fileReference.value(), 0.0); + if (startDownloadRpc(fileReference)) + return fileReferenceDownload.future().get(timeout.toMillis(), TimeUnit.MILLISECONDS); + else { + fileReferenceDownload.future().setException(new RuntimeException("Failed getting file")); + downloads.remove(fileReference); + return Optional.empty(); + } + } + + synchronized void addToDownloadQueue(FileReferenceDownload fileReferenceDownload) { + downloadQueue.add(fileReferenceDownload); + } + + void receiveFile(FileReference fileReference, String filename, byte[] content, long xxHash) { + fileReceiver.receiveFile(fileReference, filename, content, xxHash); + } + + synchronized Set<FileReference> queuedDownloads() { + return downloadQueue.stream() + .map(FileReferenceDownload::fileReference) + .collect(Collectors.toCollection(LinkedHashSet::new)); + } + + private void readFromQueue() { + do { + FileReferenceDownload fileReferenceDownload = downloadQueue.poll(); + if (fileReferenceDownload == null) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { /* ignore for now */} + } else { + log.log(LogLevel.INFO, "Polling queue, found file reference '" + + fileReferenceDownload.fileReference().value() + "' to download"); + downloadExecutor.submit(() -> startDownload(fileReferenceDownload.fileReference(), downloadTimeout, fileReferenceDownload)); + } + } while (true); + } + + void completedDownloading(FileReference fileReference, File file) { + if (downloads.containsKey(fileReference)) + downloads.get(fileReference).future().set(Optional.of(file)); + downloadStatus.put(fileReference, 100.0); + } + + private boolean startDownloadRpc(FileReference fileReference) throws ExecutionException, InterruptedException { + Connection connection = connectionPool.getCurrent(); + Request request = new Request("filedistribution.serveFile"); + request.parameters().add(new StringValue(fileReference.value())); + + execute(request, connection); + if (validateResponse(request)) { + log.log(LogLevel.DEBUG, "Request callback, OK. Req: " + request + "\nSpec: " + connection); + if (request.returnValues().get(0).asInt32() == 0) + log.log(LogLevel.INFO, "Found file reference '" + fileReference.value() + "' available at " + connection.getAddress()); + else + log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' not found for " + connection.getAddress()); + return true; + } else { + log.log(LogLevel.WARNING, "Request failed. Req: " + request + "\nSpec: " + connection.getAddress()); + connection.setError(request.errorCode()); + // TODO: Retry with another config server + return false; + } + } + + synchronized boolean isDownloading(FileReference fileReference) { + return downloads.containsKey(fileReference); + } + + synchronized ListenableFuture<Optional<File>> addDownloadListener(FileReference fileReference, Runnable runnable) { + FileReferenceDownload fileReferenceDownload = downloads.get(fileReference); + fileReferenceDownload.future().addListener(runnable, downloadExecutor); + return fileReferenceDownload.future(); + } + + private void execute(Request request, Connection connection) { + connection.invokeSync(request, (double) rpcTimeout.getSeconds()); + } + + private boolean validateResponse(Request request) { + if (request.isError()) { + return false; + } else if (request.returnValues().size() == 0) { + return false; + } else if (!request.checkReturnTypes("is")) { // TODO: Do not hard-code return type + log.log(LogLevel.WARNING, "Invalid return types for response: " + request.errorMessage()); + return false; + } + return true; + } + + double downloadStatus(String file) { + return downloadStatus.getOrDefault(new FileReference(file), 0.0); + } + + void setDownloadStatus(String file, double percentageDownloaded) { + downloadStatus.put(new FileReference(file), percentageDownloaded); + } + + Map<FileReference, Double> downloadStatus() { + return ImmutableMap.copyOf(downloadStatus); + } + +} diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java new file mode 100644 index 00000000000..d1b691b9d5e --- /dev/null +++ b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java @@ -0,0 +1,277 @@ +package com.yahoo.vespa.config.proxy.filedistribution; + +import com.yahoo.config.FileReference; +import com.yahoo.io.IOUtils; +import com.yahoo.jrt.Int32Value; +import com.yahoo.jrt.Request; +import com.yahoo.jrt.RequestWaiter; +import com.yahoo.jrt.StringValue; +import com.yahoo.jrt.Supervisor; +import com.yahoo.jrt.Transport; +import com.yahoo.text.Utf8; +import com.yahoo.vespa.config.Connection; +import com.yahoo.vespa.config.ConnectionPool; +import net.jpountz.xxhash.XXHash64; +import net.jpountz.xxhash.XXHashFactory; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class FileDownloaderTest { + + private final XXHash64 hasher = XXHashFactory.fastestInstance().hash64(); + + private MockConnection connection; + private FileDownloader fileDownloader; + private File downloadDir; + + @Before + public void setup() { + try { + downloadDir = Files.createTempDirectory("filedistribution").toFile(); + connection = new MockConnection(); + fileDownloader = new FileDownloader(connection, downloadDir, Duration.ofMillis(3000)); + } catch (IOException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void getFile() throws IOException { + File downloadDir = fileDownloader.downloadDirectory(); + + { + // fileReference already exists on disk, does not have to be downloaded + + String fileReferenceString = "foo"; + String filename = "foo.jar"; + FileReference fileReference = new FileReference(fileReferenceString); + File fileReferenceFullPath = fileReferenceFullPath(downloadDir, fileReference); + writeFileReference(downloadDir, fileReferenceString, filename); + + // Check that we get correct path and content when asking for file reference + Optional<File> pathToFile = fileDownloader.getFile(fileReference); + assertTrue(pathToFile.isPresent()); + String downloadedFile = new File(fileReferenceFullPath, filename).getAbsolutePath(); + assertEquals(new File(fileReferenceFullPath, filename).getAbsolutePath(), downloadedFile); + assertEquals("content", IOUtils.readFile(pathToFile.get())); + + // Verify download status when downloaded + assertDownloadStatus(fileDownloader, fileReference, 100.0); + } + + { + // fileReference does not exist on disk, needs to be downloaded, but fails when asking upstream for file) + + connection.setResponseHandler(new MockConnection.UnknownFileReferenceResponseHandler()); + + FileReference fileReference = new FileReference("bar"); + File fileReferenceFullPath = fileReferenceFullPath(downloadDir, fileReference); + assertFalse(fileReferenceFullPath.getAbsolutePath(), fileDownloader.getFile(fileReference).isPresent()); + + // Verify download status when unable to download + assertDownloadStatus(fileDownloader, fileReference, 0.0); + } + + { + // fileReference does not exist on disk, needs to be downloaded) + + FileReference fileReference = new FileReference("fileReference"); + File fileReferenceFullPath = fileReferenceFullPath(downloadDir, fileReference); + assertFalse(fileReferenceFullPath.getAbsolutePath(), fileDownloader.getFile(fileReference).isPresent()); + + // Verify download status + assertDownloadStatus(fileDownloader, fileReference, 0.0); + + // Receives fileReference, should return and make it available to caller + String filename = "abc.jar"; + receiveFile(fileReference, filename, "some other content"); + Optional<File> downloadedFile = fileDownloader.getFile(fileReference); + + assertTrue(downloadedFile.isPresent()); + File downloadedFileFullPath = new File(fileReferenceFullPath, filename); + assertEquals(downloadedFileFullPath.getAbsolutePath(), downloadedFile.get().getAbsolutePath()); + assertEquals("some other content", IOUtils.readFile(downloadedFile.get())); + + // Verify download status when downloaded + assertDownloadStatus(fileDownloader, fileReference, 100.0); + } + } + + @Test + public void setFilesToDownload() throws IOException { + Duration timeout = Duration.ofMillis(200); + File downloadDir = Files.createTempDirectory("filedistribution").toFile(); + MockConnection connectionPool = new MockConnection(); + connectionPool.setResponseHandler(new MockConnection.WaitResponseHandler(timeout.plus(Duration.ofMillis(1000)))); + FileDownloader fileDownloader = new FileDownloader(connectionPool, downloadDir, timeout); + FileReference foo = new FileReference("foo"); + FileReference bar = new FileReference("bar"); + List<FileReference> fileReferences = Arrays.asList(foo, bar); + fileDownloader.queueForDownload(fileReferences); + + // Verify download status + assertDownloadStatus(fileDownloader, foo, 0.0); + assertDownloadStatus(fileDownloader, bar, 0.0); + } + + @Test + public void receiveFile() throws IOException { + FileReference foo = new FileReference("foo"); + String filename = "foo.jar"; + receiveFile(foo, filename, "content"); + File downloadedFile = new File(fileReferenceFullPath(downloadDir, foo), filename); + assertEquals("content", IOUtils.readFile(downloadedFile)); + } + + private void writeFileReference(File dir, String fileReferenceString, String fileName) throws IOException { + File file = new File(new File(dir, fileReferenceString), fileName); + IOUtils.writeFile(file, "content", false); + } + + private File fileReferenceFullPath(File dir, FileReference fileReference) { + return new File(dir, fileReference.value()); + } + + private void assertDownloadStatus(FileDownloader fileDownloader, FileReference fileReference, double expectedDownloadStatus) { + double downloadStatus = fileDownloader.downloadStatus(fileReference); + assertEquals(expectedDownloadStatus, downloadStatus, 0.0001); + } + + private void receiveFile(FileReference fileReference, String filename, String content) { + byte[] contentBytes = Utf8.toBytes(content); + long xxHashFromContent = hasher.hash(ByteBuffer.wrap(contentBytes), 0); + fileDownloader.receiveFile(fileReference, filename, contentBytes, xxHashFromContent); + } + + private static class MockConnection implements ConnectionPool, com.yahoo.vespa.config.Connection { + + private ResponseHandler responseHandler; + + MockConnection() { + this(new FileReferenceFoundResponseHandler()); + } + + MockConnection(ResponseHandler responseHandler) { + this.responseHandler = responseHandler; + } + + @Override + public void invokeAsync(Request request, double jrtTimeout, RequestWaiter requestWaiter) { + responseHandler.request(request); + } + + @Override + public void invokeSync(Request request, double jrtTimeout) { + responseHandler.request(request); + } + + @Override + public void setError(int errorCode) { + } + + @Override + public void setSuccess() { + } + + @Override + public String getAddress() { + return null; + } + + @Override + public void close() { + } + + @Override + public void setError(Connection connection, int errorCode) { + connection.setError(errorCode); + } + + @Override + public Connection getCurrent() { + return this; + } + + @Override + public Connection setNewCurrentConnection() { + return this; + } + + @Override + public int getSize() { + return 1; + } + + @Override + public Supervisor getSupervisor() { + return new Supervisor(new Transport()); + } + + void setResponseHandler(ResponseHandler responseHandler) { + this.responseHandler = responseHandler; + } + + public interface ResponseHandler { + void request(Request request); + } + + static class FileReferenceFoundResponseHandler implements MockConnection.ResponseHandler { + + @Override + public void request(Request request) { + if (request.methodName().equals("filedistribution.serveFile")) { + request.returnValues().add(new Int32Value(0)); + request.returnValues().add(new StringValue("OK")); + } + } + } + + static class UnknownFileReferenceResponseHandler implements MockConnection.ResponseHandler { + + @Override + public void request(Request request) { + if (request.methodName().equals("filedistribution.serveFile")) { + request.returnValues().add(new Int32Value(1)); + request.returnValues().add(new StringValue("Internal error")); + } + } + } + + static class WaitResponseHandler implements MockConnection.ResponseHandler { + + private final Duration waitUntilAnswering; + + WaitResponseHandler(Duration waitUntilAnswering) { + super(); + this.waitUntilAnswering = waitUntilAnswering; + } + + @Override + public void request(Request request) { + try { Thread.sleep(waitUntilAnswering.toMillis());} catch (InterruptedException e) { /* do nothing */ } + + if (request.methodName().equals("filedistribution.serveFile")) { + request.returnValues().add(new Int32Value(0)); + request.returnValues().add(new StringValue("OK")); + } + + } + } + } + +} |