aboutsummaryrefslogtreecommitdiffstats
path: root/config-proxy
diff options
context:
space:
mode:
authorHarald Musum <musum@yahoo-inc.com>2017-11-16 12:21:37 +0100
committerGitHub <noreply@github.com>2017-11-16 12:21:37 +0100
commit89d90b7503d61954bc24e68c54d8492d3eafd007 (patch)
tree1e3effbe29d0d3facdae5914753ef3273932a692 /config-proxy
parent7a6772d914b0de0ecf683f1233c349e34067ec37 (diff)
Revert "Hmusum/move new filedistribution classes"
Diffstat (limited to 'config-proxy')
-rw-r--r--config-proxy/pom.xml13
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java4
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionRpcServer.java127
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloader.java153
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReceiver.java133
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownload.java28
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java172
-rw-r--r--config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java277
8 files changed, 896 insertions, 11 deletions
diff --git a/config-proxy/pom.xml b/config-proxy/pom.xml
index 13e2ddf40e8..8a7b6f253e2 100644
--- a/config-proxy/pom.xml
+++ b/config-proxy/pom.xml
@@ -57,15 +57,10 @@
<artifactId>hamcrest-core</artifactId>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </dependency>
- <dependency>
- <groupId>com.yahoo.vespa</groupId>
- <artifactId>filedistribution</artifactId>
- <version>${project.version}</version>
- </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java
index 28bcca9db13..173d2b8a43a 100644
--- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java
@@ -13,8 +13,8 @@ import com.yahoo.log.event.Event;
import com.yahoo.system.CatchSigTerm;
import com.yahoo.vespa.config.*;
import com.yahoo.vespa.config.protocol.JRTServerConfigRequest;
-import com.yahoo.vespa.filedistribution.FileDistributionRpcServer;
-import com.yahoo.vespa.filedistribution.FileDownloader;
+import com.yahoo.vespa.config.proxy.filedistribution.FileDistributionRpcServer;
+import com.yahoo.vespa.config.proxy.filedistribution.FileDownloader;
import java.util.List;
import java.util.concurrent.Executors;
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionRpcServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionRpcServer.java
new file mode 100644
index 00000000000..b0fbc7acd33
--- /dev/null
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionRpcServer.java
@@ -0,0 +1,127 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.config.proxy.filedistribution;
+
+import com.yahoo.config.FileReference;
+import com.yahoo.jrt.DoubleArray;
+import com.yahoo.jrt.Int32Value;
+import com.yahoo.jrt.Method;
+import com.yahoo.jrt.Request;
+import com.yahoo.jrt.StringArray;
+import com.yahoo.jrt.StringValue;
+import com.yahoo.jrt.Supervisor;
+import com.yahoo.log.LogLevel;
+
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * An RPC server that handles file distribution requests.
+ *
+ * @author hmusum
+ */
+public class FileDistributionRpcServer {
+
+ private final static Logger log = Logger.getLogger(FileDistributionRpcServer.class.getName());
+
+ private final Supervisor supervisor;
+ private final FileDownloader downloader;
+
+ public FileDistributionRpcServer(Supervisor supervisor, FileDownloader downloader) {
+ this.supervisor = supervisor;
+ this.downloader = downloader;
+ declareFileDistributionMethods();
+ }
+
+ private void declareFileDistributionMethods() {
+ // Legacy method, needs to be the same name as used in filedistributor
+ supervisor.addMethod(new Method("waitFor", "s", "s",
+ this, "getFile")
+ .methodDesc("get path to file reference")
+ .paramDesc(0, "file reference", "file reference")
+ .returnDesc(0, "path", "path to file"));
+ supervisor.addMethod(new Method("filedistribution.getFile", "s", "s",
+ this, "getFile")
+ .methodDesc("get path to file reference")
+ .paramDesc(0, "file reference", "file reference")
+ .returnDesc(0, "path", "path to file"));
+ supervisor.addMethod(new Method("filedistribution.getActiveFileReferencesStatus", "", "SD",
+ this, "getActiveFileReferencesStatus")
+ .methodDesc("download status for file references")
+ .returnDesc(0, "file references", "array of file references")
+ .returnDesc(1, "download status", "percentage downloaded of each file reference in above array"));
+ supervisor.addMethod(new Method("filedistribution.setFileReferencesToDownload", "S", "i",
+ this, "setFileReferencesToDownload")
+ .methodDesc("set which file references to download")
+ .paramDesc(0, "file references", "file reference to download")
+ .returnDesc(0, "ret", "0 if success, 1 otherwise"));
+ }
+
+
+ //---------------- RPC methods ------------------------------------
+ // TODO: Duplicate of code in FileAcquirereImpl. Find out where to put it. What about C++ code using this RPC call?
+ private static final int baseErrorCode = 0x10000;
+ private static final int baseFileProviderErrorCode = baseErrorCode + 0x1000;
+
+ private static final int fileReferenceDoesNotExists = baseFileProviderErrorCode;
+ private static final int fileReferenceRemoved = fileReferenceDoesNotExists + 1;
+ private static final int fileReferenceInternalError = fileReferenceRemoved + 1;
+
+ @SuppressWarnings({"UnusedDeclaration"})
+ public final void getFile(Request req) {
+ req.detach();
+ FileReference fileReference = new FileReference(req.parameters().get(0).asString());
+ log.log(LogLevel.DEBUG, "getFile() called for file reference '" + fileReference.value() + "'");
+ Optional<File> pathToFile = downloader.getFile(fileReference);
+ try {
+ if (pathToFile.isPresent()) {
+ req.returnValues().add(new StringValue(pathToFile.get().getAbsolutePath()));
+ log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' available at " + pathToFile.get());
+ } else {
+ log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' not found, returning error");
+ req.setError(fileReferenceDoesNotExists, "File reference '" + fileReference.value() + "' not found");
+ }
+ } catch (Throwable e) {
+ log.log(LogLevel.WARNING, "File reference '" + fileReference.value() + "' got exeption: " + e.getMessage());
+ req.setError(fileReferenceInternalError, "File reference '" + fileReference.value() + "' removed");
+ }
+ req.returnRequest();
+ }
+
+ @SuppressWarnings({"UnusedDeclaration"})
+ public final void getActiveFileReferencesStatus(Request req) {
+ Map<FileReference, Double> downloadStatus = downloader.downloadStatus();
+
+ String[] fileRefArray = new String[downloadStatus.keySet().size()];
+ fileRefArray = downloadStatus.keySet().stream()
+ .map(FileReference::value)
+ .collect(Collectors.toList())
+ .toArray(fileRefArray);
+
+ double[] downloadStatusArray = new double[downloadStatus.values().size()];
+ int i = 0;
+ for (Double d : downloadStatus.values()) {
+ downloadStatusArray[i++] = d;
+ }
+
+ req.returnValues().add(new StringArray(fileRefArray));
+ req.returnValues().add(new DoubleArray(downloadStatusArray));
+ }
+
+ @SuppressWarnings({"UnusedDeclaration"})
+ public final void setFileReferencesToDownload(Request req) {
+ String[] fileReferenceStrings = req.parameters().get(0).asStringArray();
+ List<FileReference> fileReferences = Stream.of(fileReferenceStrings)
+ .map(FileReference::new)
+ .collect(Collectors.toList());
+ downloader.queueForDownload(fileReferences);
+
+ req.returnValues().add(new Int32Value(0));
+ }
+
+
+}
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloader.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloader.java
new file mode 100644
index 00000000000..ac7555c7905
--- /dev/null
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloader.java
@@ -0,0 +1,153 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.config.proxy.filedistribution;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import com.yahoo.config.FileReference;
+import com.yahoo.log.LogLevel;
+import com.yahoo.vespa.config.ConnectionPool;
+import com.yahoo.vespa.defaults.Defaults;
+import com.yahoo.yolean.Exceptions;
+
+import java.io.File;
+import java.time.Duration;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Logger;
+
+/**
+ * Handles downloads of files (file references only for now)
+ *
+ * @author hmusum
+ */
+public class FileDownloader {
+
+ private final static Logger log = Logger.getLogger(FileDownloader.class.getName());
+
+ private final File downloadDirectory;
+ private final Duration timeout;
+ private final FileReferenceDownloader fileReferenceDownloader;
+
+ public FileDownloader(ConnectionPool connectionPool) {
+ this(connectionPool,
+ new File(Defaults.getDefaults().underVespaHome("var/db/vespa/filedistribution")),
+ Duration.ofMinutes(15));
+ }
+
+ FileDownloader(ConnectionPool connectionPool, File downloadDirectory, Duration timeout) {
+ this.downloadDirectory = downloadDirectory;
+ this.timeout = timeout;
+ this.fileReferenceDownloader = new FileReferenceDownloader(downloadDirectory, connectionPool, timeout);
+ }
+
+ public Optional<File> getFile(FileReference fileReference) {
+ Objects.requireNonNull(fileReference, "file reference cannot be null");
+ File directory = new File(downloadDirectory, fileReference.value());
+ log.log(LogLevel.DEBUG, "Checking if there is a file in '" + directory.getAbsolutePath() + "' ");
+
+ Optional<File> file = getFileFromFileSystem(fileReference, directory);
+ if (file.isPresent()) {
+ return file;
+ } else {
+ log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' not found in " +
+ directory.getAbsolutePath() + ", starting download");
+ return queueForDownload(fileReference, timeout);
+ }
+ }
+
+ public void queueForDownload(List<FileReference> fileReferences) {
+ fileReferences.forEach(this::queueForDownload);
+ }
+
+ public void receiveFile(FileReference fileReference, String filename, byte[] content, long xxHash) {
+ fileReferenceDownloader.receiveFile(fileReference, filename, content, xxHash);
+ }
+
+ double downloadStatus(FileReference fileReference) {
+ return fileReferenceDownloader.downloadStatus(fileReference.value());
+ }
+
+ public Map<FileReference, Double> downloadStatus() {
+ return fileReferenceDownloader.downloadStatus();
+ }
+
+ File downloadDirectory() {
+ return downloadDirectory;
+ }
+
+ private Optional<File> getFileFromFileSystem(FileReference fileReference, File directory) {
+ File[] files = directory.listFiles();
+ if (directory.exists() && directory.isDirectory() && files != null && files.length > 0) {
+ if (files.length != 1) {
+ throw new RuntimeException("More than one file in '" + fileReference.value() +
+ "', expected only one, unable to proceed");
+ }
+ File file = files[0];
+ if (!file.exists()) {
+ throw new RuntimeException("File with reference '" + fileReference.value() +
+ "' does not exist");
+ } else if (!file.canRead()) {
+ throw new RuntimeException("File with reference '" + fileReference.value() +
+ "'exists, but unable to read it");
+ } else {
+ fileReferenceDownloader.setDownloadStatus(fileReference.value(), 100.0);
+ return Optional.of(file);
+ }
+ }
+ return Optional.empty();
+ }
+
+ private synchronized Optional<File> queueForDownload(FileReference fileReference, Duration timeout) {
+ if (fileReferenceDownloader.isDownloading(fileReference)) {
+ log.log(LogLevel.INFO, "Already downloading '" + fileReference.value() + "'");
+ ListenableFuture<Optional<File>> future =
+ fileReferenceDownloader.addDownloadListener(fileReference, () -> getFile(fileReference));
+ try {
+ return future.get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException("Failed downloading file reference '" + fileReference.value() + "': " +
+ Exceptions.toMessageString(e));
+ }
+ }
+
+ SettableFuture<Optional<File>> future = SettableFuture.create();
+ queueForDownload(new FileReferenceDownload(fileReference, future));
+ log.log(LogLevel.INFO, "Queued '" + fileReference.value() + "' for download with timeout " + timeout);
+
+ try {
+ Optional<File> fileDownloaded;
+ try {
+ log.log(LogLevel.INFO, "Waiting for '" + fileReference.value() + "' to download");
+ fileDownloaded = future.get(timeout.getSeconds() - 1, TimeUnit.SECONDS);
+ log.log(LogLevel.INFO, "'" + fileReference.value() + "' downloaded");
+ } catch (TimeoutException e) {
+ log.log(LogLevel.WARNING, "Downloading '" + fileReference.value() + "' timed out");
+ return Optional.empty();
+ }
+ return fileDownloaded;
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException("Could not download '" + fileReference.value() + "'");
+ }
+ }
+
+ // We don't care about the future in this call
+ private synchronized void queueForDownload(FileReference fileReference) {
+ queueForDownload(new FileReferenceDownload(fileReference, SettableFuture.create()));
+ }
+
+ private synchronized void queueForDownload(FileReferenceDownload fileReferenceDownload) {
+ fileReferenceDownloader.addToDownloadQueue(fileReferenceDownload);
+ }
+
+ Set<FileReference> queuedDownloads() {
+ return fileReferenceDownloader.queuedDownloads();
+ }
+
+}
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReceiver.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReceiver.java
new file mode 100644
index 00000000000..e9631c445df
--- /dev/null
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReceiver.java
@@ -0,0 +1,133 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+package com.yahoo.vespa.config.proxy.filedistribution;
+
+import com.yahoo.config.FileReference;
+import com.yahoo.jrt.Int32Value;
+import com.yahoo.jrt.Method;
+import com.yahoo.jrt.Request;
+import com.yahoo.jrt.Supervisor;
+import com.yahoo.log.LogLevel;
+import net.jpountz.xxhash.XXHash64;
+import net.jpountz.xxhash.XXHashFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Logger;
+
+public class FileReceiver {
+
+ private final static Logger log = Logger.getLogger(FileReceiver.class.getName());
+ private final static String RECEIVE_METHOD = "filedistribution.receiveFile";
+ private final static String RECEIVE_META_METHOD = "filedistribution.receiveFileMeta";
+ private final static String RECEIVE_PART_METHOD = "filedistribution.receiveFilePart";
+ private final static String RECEIVE_EOF_METHOD = "filedistribution.receiveFileEof";
+
+ private final Supervisor supervisor;
+ private final FileReferenceDownloader downloader;
+ private final File downloadDirectory;
+ private final XXHash64 hasher = XXHashFactory.fastestInstance().hash64();
+
+ public FileReceiver(Supervisor supervisor, FileReferenceDownloader downloader, File downloadDirectory) {
+ this.supervisor = supervisor;
+ this.downloader = downloader;
+ this.downloadDirectory = downloadDirectory;
+ registerMethods();
+ }
+
+ private void registerMethods() {
+ receiveFileMethod(this).forEach((method) -> supervisor.addMethod(method));
+ }
+
+ // Defined here so that it can be added to supervisor used by client (server will use same connection when calling
+ // receiveFile after getting a serveFile method call). handler needs to implement receiveFile method
+ private List<Method> receiveFileMethod(Object handler) {
+ List<Method> methods = new ArrayList<>();
+ methods.add(new Method(RECEIVE_META_METHOD, "ssl", "ii", handler,"receiveFileMeta")
+ .paramDesc(0, "filereference", "file reference to download")
+ .paramDesc(1, "filename", "filename")
+ .paramDesc(2, "filelength", "length in bytes of file")
+ .returnDesc(0, "ret", "0 if success, 1 otherwise")
+ .returnDesc(1, "session-id", "Session id to be used for this transfer"));
+ methods.add(new Method(RECEIVE_PART_METHOD, "siix", "i", handler,"receiveFilePart")
+ .paramDesc(0, "filereference", "file reference to download")
+ .paramDesc(1, "session-id", "Session id to be used for this transfer")
+ .paramDesc(2, "partid", "relative part number starting at zero")
+ .paramDesc(3, "data", "bytes in this part")
+ .returnDesc(0, "ret", "0 if success, 1 otherwise"));
+ methods.add(new Method(RECEIVE_EOF_METHOD, "silis", "i", handler,"receiveFileEof")
+ .paramDesc(0, "filereference", "file reference to download")
+ .paramDesc(1, "session-id", "Session id to be used for this transfer")
+ .paramDesc(2, "crc-code", "crc code (xxhash64)")
+ .paramDesc(3, "error-code", "Error code. 0 if none")
+ .paramDesc(4, "error-description", "Error description.")
+ .returnDesc(0, "ret", "0 if success, 1 if crc mismatch, 2 otherwise"));
+ // Temporary method until we have chunking
+ methods.add(new Method(RECEIVE_METHOD, "ssxlis", "i", handler, "receiveFile")
+ .methodDesc("receive file reference content")
+ .paramDesc(0, "file reference", "file reference to download")
+ .paramDesc(1, "filename", "filename")
+ .paramDesc(2, "content", "array of bytes")
+ .paramDesc(3, "hash", "xx64hash of the file content")
+ .paramDesc(4, "errorcode", "Error code. 0 if none")
+ .paramDesc(5, "error-description", "Error description.")
+ .returnDesc(0, "ret", "0 if success, 1 otherwise"));
+ return methods;
+ }
+
+ @SuppressWarnings({"UnusedDeclaration"})
+ public final void receiveFile(Request req) {
+ FileReference fileReference = new FileReference(req.parameters().get(0).asString());
+ String filename = req.parameters().get(1).asString();
+ byte[] content = req.parameters().get(2).asData();
+ long xxhash = req.parameters().get(3).asInt64();
+ int errorCode = req.parameters().get(4).asInt32();
+ String errorDescription = req.parameters().get(5).asString();
+
+ if (errorCode == 0) {
+ // TODO: Remove when system test works
+ log.log(LogLevel.INFO, "Receiving file reference '" + fileReference.value() + "'");
+ receiveFile(fileReference, filename, content, xxhash);
+ req.returnValues().add(new Int32Value(0));
+ } else {
+ log.log(LogLevel.WARNING, "Receiving file reference '" + fileReference.value() + "' failed: " + errorDescription);
+ req.returnValues().add(new Int32Value(1));
+ // TODO: Add error description return value here too?
+ }
+ }
+
+ void receiveFile(FileReference fileReference, String filename, byte[] content, long xxHash) {
+ long xxHashFromContent = hasher.hash(ByteBuffer.wrap(content), 0);
+ if (xxHashFromContent != xxHash)
+ throw new RuntimeException("xxhash from content (" + xxHashFromContent + ") is not equal to xxhash in request (" + xxHash + ")");
+
+ File fileReferenceDir = new File(downloadDirectory, fileReference.value());
+ try {
+ Files.createDirectories(fileReferenceDir.toPath());
+ File file = new File(fileReferenceDir, filename);
+ log.log(LogLevel.INFO, "Writing data to " + file.getAbsolutePath());
+ Files.write(file.toPath(), content);
+ downloader.completedDownloading(fileReference, file);
+ } catch (IOException e) {
+ log.log(LogLevel.ERROR, "Failed writing file: " + e.getMessage());
+ throw new RuntimeException("Failed writing file: ", e);
+ }
+ }
+
+ @SuppressWarnings({"UnusedDeclaration"})
+ public final void receiveFileMeta(Request req) {
+ log.info("Received method call '" + req.methodName() + "' with parameters : " + req.parameters());
+ }
+ @SuppressWarnings({"UnusedDeclaration"})
+ public final void receiveFilePart(Request req) {
+ log.info("Received method call '" + req.methodName() + "' with parameters : " + req.parameters());
+ }
+ @SuppressWarnings({"UnusedDeclaration"})
+ public final void receiveFileEof(Request req) {
+ log.info("Received method call '" + req.methodName() + "' with parameters : " + req.parameters());
+ }
+}
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownload.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownload.java
new file mode 100644
index 00000000000..ce5a30dc7ad
--- /dev/null
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownload.java
@@ -0,0 +1,28 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+package com.yahoo.vespa.config.proxy.filedistribution;
+
+import com.google.common.util.concurrent.SettableFuture;
+import com.yahoo.config.FileReference;
+
+import java.io.File;
+import java.util.Optional;
+
+public class FileReferenceDownload {
+ private final FileReference fileReference;
+ private final SettableFuture<Optional<File>> future;
+
+ FileReferenceDownload(FileReference fileReference, SettableFuture<Optional<File>> future) {
+ this.fileReference = fileReference;
+ this.future = future;
+ }
+
+ FileReference fileReference() {
+ return fileReference;
+ }
+
+ SettableFuture<Optional<File>> future() {
+ return future;
+ }
+
+}
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java
new file mode 100644
index 00000000000..c972cfbbf56
--- /dev/null
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java
@@ -0,0 +1,172 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.config.proxy.filedistribution;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.yahoo.concurrent.DaemonThreadFactory;
+import com.yahoo.config.FileReference;
+import com.yahoo.jrt.Request;
+import com.yahoo.jrt.StringValue;
+import com.yahoo.log.LogLevel;
+import com.yahoo.vespa.config.Connection;
+import com.yahoo.vespa.config.ConnectionPool;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+
+/**
+ * Downloads file reference using rpc requests to config server and keeps track of files being downloaded
+ * <p>
+ * Some methods are synchronized to make sure access to downloads is atomic
+ *
+ * @author hmusum
+ */
+// TODO: Add retries when a config server does not have a file reference
+// TODO: Handle shutdown of executors
+class FileReferenceDownloader {
+
+ private final static Logger log = Logger.getLogger(FileReferenceDownloader.class.getName());
+ private final static Duration rpcTimeout = Duration.ofSeconds(10);
+
+ private final ExecutorService downloadExecutor =
+ Executors.newFixedThreadPool(10, new DaemonThreadFactory("filereference downloader"));
+ private ExecutorService readFromQueueExecutor =
+ Executors.newFixedThreadPool(1, new DaemonThreadFactory("filereference download queue"));
+ private final ConnectionPool connectionPool;
+ private final ConcurrentLinkedQueue<FileReferenceDownload> downloadQueue = new ConcurrentLinkedQueue<>();
+ private final Map<FileReference, FileReferenceDownload> downloads = new LinkedHashMap<>();
+ private final Map<FileReference, Double> downloadStatus = new HashMap<>();
+ private final Duration downloadTimeout;
+ private final FileReceiver fileReceiver;
+
+ FileReferenceDownloader(File downloadDirectory, ConnectionPool connectionPool, Duration timeout) {
+ this.connectionPool = connectionPool;
+ this.downloadTimeout = timeout;
+ readFromQueueExecutor.submit(this::readFromQueue);
+ this.fileReceiver = new FileReceiver(connectionPool.getSupervisor(), this, downloadDirectory);
+ }
+
+ private synchronized Optional<File> startDownload(FileReference fileReference,
+ Duration timeout,
+ FileReferenceDownload fileReferenceDownload)
+ throws ExecutionException, InterruptedException, TimeoutException {
+ downloads.put(fileReference, fileReferenceDownload);
+ setDownloadStatus(fileReference.value(), 0.0);
+ if (startDownloadRpc(fileReference))
+ return fileReferenceDownload.future().get(timeout.toMillis(), TimeUnit.MILLISECONDS);
+ else {
+ fileReferenceDownload.future().setException(new RuntimeException("Failed getting file"));
+ downloads.remove(fileReference);
+ return Optional.empty();
+ }
+ }
+
+ synchronized void addToDownloadQueue(FileReferenceDownload fileReferenceDownload) {
+ downloadQueue.add(fileReferenceDownload);
+ }
+
+ void receiveFile(FileReference fileReference, String filename, byte[] content, long xxHash) {
+ fileReceiver.receiveFile(fileReference, filename, content, xxHash);
+ }
+
+ synchronized Set<FileReference> queuedDownloads() {
+ return downloadQueue.stream()
+ .map(FileReferenceDownload::fileReference)
+ .collect(Collectors.toCollection(LinkedHashSet::new));
+ }
+
+ private void readFromQueue() {
+ do {
+ FileReferenceDownload fileReferenceDownload = downloadQueue.poll();
+ if (fileReferenceDownload == null) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) { /* ignore for now */}
+ } else {
+ log.log(LogLevel.INFO, "Polling queue, found file reference '" +
+ fileReferenceDownload.fileReference().value() + "' to download");
+ downloadExecutor.submit(() -> startDownload(fileReferenceDownload.fileReference(), downloadTimeout, fileReferenceDownload));
+ }
+ } while (true);
+ }
+
+ void completedDownloading(FileReference fileReference, File file) {
+ if (downloads.containsKey(fileReference))
+ downloads.get(fileReference).future().set(Optional.of(file));
+ downloadStatus.put(fileReference, 100.0);
+ }
+
+ private boolean startDownloadRpc(FileReference fileReference) throws ExecutionException, InterruptedException {
+ Connection connection = connectionPool.getCurrent();
+ Request request = new Request("filedistribution.serveFile");
+ request.parameters().add(new StringValue(fileReference.value()));
+
+ execute(request, connection);
+ if (validateResponse(request)) {
+ log.log(LogLevel.DEBUG, "Request callback, OK. Req: " + request + "\nSpec: " + connection);
+ if (request.returnValues().get(0).asInt32() == 0)
+ log.log(LogLevel.INFO, "Found file reference '" + fileReference.value() + "' available at " + connection.getAddress());
+ else
+ log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' not found for " + connection.getAddress());
+ return true;
+ } else {
+ log.log(LogLevel.WARNING, "Request failed. Req: " + request + "\nSpec: " + connection.getAddress());
+ connection.setError(request.errorCode());
+ // TODO: Retry with another config server
+ return false;
+ }
+ }
+
+ synchronized boolean isDownloading(FileReference fileReference) {
+ return downloads.containsKey(fileReference);
+ }
+
+ synchronized ListenableFuture<Optional<File>> addDownloadListener(FileReference fileReference, Runnable runnable) {
+ FileReferenceDownload fileReferenceDownload = downloads.get(fileReference);
+ fileReferenceDownload.future().addListener(runnable, downloadExecutor);
+ return fileReferenceDownload.future();
+ }
+
+ private void execute(Request request, Connection connection) {
+ connection.invokeSync(request, (double) rpcTimeout.getSeconds());
+ }
+
+ private boolean validateResponse(Request request) {
+ if (request.isError()) {
+ return false;
+ } else if (request.returnValues().size() == 0) {
+ return false;
+ } else if (!request.checkReturnTypes("is")) { // TODO: Do not hard-code return type
+ log.log(LogLevel.WARNING, "Invalid return types for response: " + request.errorMessage());
+ return false;
+ }
+ return true;
+ }
+
+ double downloadStatus(String file) {
+ return downloadStatus.getOrDefault(new FileReference(file), 0.0);
+ }
+
+ void setDownloadStatus(String file, double percentageDownloaded) {
+ downloadStatus.put(new FileReference(file), percentageDownloaded);
+ }
+
+ Map<FileReference, Double> downloadStatus() {
+ return ImmutableMap.copyOf(downloadStatus);
+ }
+
+}
diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java
new file mode 100644
index 00000000000..d1b691b9d5e
--- /dev/null
+++ b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java
@@ -0,0 +1,277 @@
+package com.yahoo.vespa.config.proxy.filedistribution;
+
+import com.yahoo.config.FileReference;
+import com.yahoo.io.IOUtils;
+import com.yahoo.jrt.Int32Value;
+import com.yahoo.jrt.Request;
+import com.yahoo.jrt.RequestWaiter;
+import com.yahoo.jrt.StringValue;
+import com.yahoo.jrt.Supervisor;
+import com.yahoo.jrt.Transport;
+import com.yahoo.text.Utf8;
+import com.yahoo.vespa.config.Connection;
+import com.yahoo.vespa.config.ConnectionPool;
+import net.jpountz.xxhash.XXHash64;
+import net.jpountz.xxhash.XXHashFactory;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class FileDownloaderTest {
+
+ private final XXHash64 hasher = XXHashFactory.fastestInstance().hash64();
+
+ private MockConnection connection;
+ private FileDownloader fileDownloader;
+ private File downloadDir;
+
+ @Before
+ public void setup() {
+ try {
+ downloadDir = Files.createTempDirectory("filedistribution").toFile();
+ connection = new MockConnection();
+ fileDownloader = new FileDownloader(connection, downloadDir, Duration.ofMillis(3000));
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void getFile() throws IOException {
+ File downloadDir = fileDownloader.downloadDirectory();
+
+ {
+ // fileReference already exists on disk, does not have to be downloaded
+
+ String fileReferenceString = "foo";
+ String filename = "foo.jar";
+ FileReference fileReference = new FileReference(fileReferenceString);
+ File fileReferenceFullPath = fileReferenceFullPath(downloadDir, fileReference);
+ writeFileReference(downloadDir, fileReferenceString, filename);
+
+ // Check that we get correct path and content when asking for file reference
+ Optional<File> pathToFile = fileDownloader.getFile(fileReference);
+ assertTrue(pathToFile.isPresent());
+ String downloadedFile = new File(fileReferenceFullPath, filename).getAbsolutePath();
+ assertEquals(new File(fileReferenceFullPath, filename).getAbsolutePath(), downloadedFile);
+ assertEquals("content", IOUtils.readFile(pathToFile.get()));
+
+ // Verify download status when downloaded
+ assertDownloadStatus(fileDownloader, fileReference, 100.0);
+ }
+
+ {
+ // fileReference does not exist on disk, needs to be downloaded, but fails when asking upstream for file)
+
+ connection.setResponseHandler(new MockConnection.UnknownFileReferenceResponseHandler());
+
+ FileReference fileReference = new FileReference("bar");
+ File fileReferenceFullPath = fileReferenceFullPath(downloadDir, fileReference);
+ assertFalse(fileReferenceFullPath.getAbsolutePath(), fileDownloader.getFile(fileReference).isPresent());
+
+ // Verify download status when unable to download
+ assertDownloadStatus(fileDownloader, fileReference, 0.0);
+ }
+
+ {
+ // fileReference does not exist on disk, needs to be downloaded)
+
+ FileReference fileReference = new FileReference("fileReference");
+ File fileReferenceFullPath = fileReferenceFullPath(downloadDir, fileReference);
+ assertFalse(fileReferenceFullPath.getAbsolutePath(), fileDownloader.getFile(fileReference).isPresent());
+
+ // Verify download status
+ assertDownloadStatus(fileDownloader, fileReference, 0.0);
+
+ // Receives fileReference, should return and make it available to caller
+ String filename = "abc.jar";
+ receiveFile(fileReference, filename, "some other content");
+ Optional<File> downloadedFile = fileDownloader.getFile(fileReference);
+
+ assertTrue(downloadedFile.isPresent());
+ File downloadedFileFullPath = new File(fileReferenceFullPath, filename);
+ assertEquals(downloadedFileFullPath.getAbsolutePath(), downloadedFile.get().getAbsolutePath());
+ assertEquals("some other content", IOUtils.readFile(downloadedFile.get()));
+
+ // Verify download status when downloaded
+ assertDownloadStatus(fileDownloader, fileReference, 100.0);
+ }
+ }
+
+ @Test
+ public void setFilesToDownload() throws IOException {
+ Duration timeout = Duration.ofMillis(200);
+ File downloadDir = Files.createTempDirectory("filedistribution").toFile();
+ MockConnection connectionPool = new MockConnection();
+ connectionPool.setResponseHandler(new MockConnection.WaitResponseHandler(timeout.plus(Duration.ofMillis(1000))));
+ FileDownloader fileDownloader = new FileDownloader(connectionPool, downloadDir, timeout);
+ FileReference foo = new FileReference("foo");
+ FileReference bar = new FileReference("bar");
+ List<FileReference> fileReferences = Arrays.asList(foo, bar);
+ fileDownloader.queueForDownload(fileReferences);
+
+ // Verify download status
+ assertDownloadStatus(fileDownloader, foo, 0.0);
+ assertDownloadStatus(fileDownloader, bar, 0.0);
+ }
+
+ @Test
+ public void receiveFile() throws IOException {
+ FileReference foo = new FileReference("foo");
+ String filename = "foo.jar";
+ receiveFile(foo, filename, "content");
+ File downloadedFile = new File(fileReferenceFullPath(downloadDir, foo), filename);
+ assertEquals("content", IOUtils.readFile(downloadedFile));
+ }
+
+ private void writeFileReference(File dir, String fileReferenceString, String fileName) throws IOException {
+ File file = new File(new File(dir, fileReferenceString), fileName);
+ IOUtils.writeFile(file, "content", false);
+ }
+
+ private File fileReferenceFullPath(File dir, FileReference fileReference) {
+ return new File(dir, fileReference.value());
+ }
+
+ private void assertDownloadStatus(FileDownloader fileDownloader, FileReference fileReference, double expectedDownloadStatus) {
+ double downloadStatus = fileDownloader.downloadStatus(fileReference);
+ assertEquals(expectedDownloadStatus, downloadStatus, 0.0001);
+ }
+
+ private void receiveFile(FileReference fileReference, String filename, String content) {
+ byte[] contentBytes = Utf8.toBytes(content);
+ long xxHashFromContent = hasher.hash(ByteBuffer.wrap(contentBytes), 0);
+ fileDownloader.receiveFile(fileReference, filename, contentBytes, xxHashFromContent);
+ }
+
+ private static class MockConnection implements ConnectionPool, com.yahoo.vespa.config.Connection {
+
+ private ResponseHandler responseHandler;
+
+ MockConnection() {
+ this(new FileReferenceFoundResponseHandler());
+ }
+
+ MockConnection(ResponseHandler responseHandler) {
+ this.responseHandler = responseHandler;
+ }
+
+ @Override
+ public void invokeAsync(Request request, double jrtTimeout, RequestWaiter requestWaiter) {
+ responseHandler.request(request);
+ }
+
+ @Override
+ public void invokeSync(Request request, double jrtTimeout) {
+ responseHandler.request(request);
+ }
+
+ @Override
+ public void setError(int errorCode) {
+ }
+
+ @Override
+ public void setSuccess() {
+ }
+
+ @Override
+ public String getAddress() {
+ return null;
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public void setError(Connection connection, int errorCode) {
+ connection.setError(errorCode);
+ }
+
+ @Override
+ public Connection getCurrent() {
+ return this;
+ }
+
+ @Override
+ public Connection setNewCurrentConnection() {
+ return this;
+ }
+
+ @Override
+ public int getSize() {
+ return 1;
+ }
+
+ @Override
+ public Supervisor getSupervisor() {
+ return new Supervisor(new Transport());
+ }
+
+ void setResponseHandler(ResponseHandler responseHandler) {
+ this.responseHandler = responseHandler;
+ }
+
+ public interface ResponseHandler {
+ void request(Request request);
+ }
+
+ static class FileReferenceFoundResponseHandler implements MockConnection.ResponseHandler {
+
+ @Override
+ public void request(Request request) {
+ if (request.methodName().equals("filedistribution.serveFile")) {
+ request.returnValues().add(new Int32Value(0));
+ request.returnValues().add(new StringValue("OK"));
+ }
+ }
+ }
+
+ static class UnknownFileReferenceResponseHandler implements MockConnection.ResponseHandler {
+
+ @Override
+ public void request(Request request) {
+ if (request.methodName().equals("filedistribution.serveFile")) {
+ request.returnValues().add(new Int32Value(1));
+ request.returnValues().add(new StringValue("Internal error"));
+ }
+ }
+ }
+
+ static class WaitResponseHandler implements MockConnection.ResponseHandler {
+
+ private final Duration waitUntilAnswering;
+
+ WaitResponseHandler(Duration waitUntilAnswering) {
+ super();
+ this.waitUntilAnswering = waitUntilAnswering;
+ }
+
+ @Override
+ public void request(Request request) {
+ try { Thread.sleep(waitUntilAnswering.toMillis());} catch (InterruptedException e) { /* do nothing */ }
+
+ if (request.methodName().equals("filedistribution.serveFile")) {
+ request.returnValues().add(new Int32Value(0));
+ request.returnValues().add(new StringValue("OK"));
+ }
+
+ }
+ }
+ }
+
+}