aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHarald Musum <musum@verizonmedia.com>2020-04-03 08:26:18 +0200
committerGitHub <noreply@github.com>2020-04-03 08:26:18 +0200
commit79ed3f35ea8bd7ca8e671676fad510f2976f0e87 (patch)
treeb73e0f6728ba07cba1612f90b96b96ddeb5473e3
parent4de936c04e19e372dcdc47f6c4ed1e50a2d7c433 (diff)
parent6bd0cc7db0f292a017dd7aca745507e6e00b1cd8 (diff)
Merge pull request #12815 from vespa-engine/hmusum/refactor-filedistribution-1
Refactor filedistribution 1
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java56
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java8
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java20
-rw-r--r--filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java76
4 files changed, 96 insertions, 64 deletions
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java
index 24b3fcac3e3..5829ab37b77 100644
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java
@@ -1,7 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.filedistribution;
-import com.google.common.util.concurrent.SettableFuture;
import com.yahoo.config.FileReference;
import com.yahoo.log.LogLevel;
import com.yahoo.vespa.config.ConnectionPool;
@@ -13,6 +12,7 @@ import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -57,21 +57,14 @@ public class FileDownloader {
}
}
- private Future<Optional<File>> getFutureFile(FileReferenceDownload fileReferenceDownload) {
+ Future<Optional<File>> getFutureFile(FileReferenceDownload fileReferenceDownload) {
FileReference fileReference = fileReferenceDownload.fileReference();
Objects.requireNonNull(fileReference, "file reference cannot be null");
- log.log(LogLevel.DEBUG, () -> "Checking if file reference '" + fileReference.value() + "' exists in '" +
- downloadDirectory.getAbsolutePath() + "' ");
- Optional<File> file = getFileFromFileSystem(fileReference, downloadDirectory);
- if (file.isPresent()) {
- SettableFuture<Optional<File>> future = SettableFuture.create();
- future.set(file);
- return future;
- } else {
- log.log(LogLevel.DEBUG, () -> "File reference '" + fileReference.value() + "' not found in " +
- downloadDirectory.getAbsolutePath() + ", starting download");
- return download(fileReferenceDownload);
- }
+
+ Optional<File> file = getFileFromFileSystem(fileReference);
+ return (file.isPresent())
+ ? CompletableFuture.completedFuture(file)
+ : download(fileReferenceDownload);
}
double downloadStatus(FileReference fileReference) {
@@ -86,9 +79,10 @@ public class FileDownloader {
return downloadDirectory;
}
- private Optional<File> getFileFromFileSystem(FileReference fileReference, File directory) {
- File[] files = new File(directory, fileReference.value()).listFiles();
- if (directory.exists() && directory.isDirectory() && files != null && files.length > 0) {
+ // Files are moved atomically, so if file reference exists and is accessible we can use it
+ private Optional<File> getFileFromFileSystem(FileReference fileReference) {
+ File[] files = new File(downloadDirectory, fileReference.value()).listFiles();
+ if (downloadDirectory.exists() && downloadDirectory.isDirectory() && files != null && files.length > 0) {
File file = files[0];
if (!file.exists()) {
throw new RuntimeException("File reference '" + fileReference.value() + "' does not exist");
@@ -105,33 +99,25 @@ public class FileDownloader {
private boolean alreadyDownloaded(FileReference fileReference) {
try {
- return (getFileFromFileSystem(fileReference, downloadDirectory).isPresent());
+ return (getFileFromFileSystem(fileReference).isPresent());
} catch (RuntimeException e) {
return false;
}
}
- public boolean downloadIfNeeded(FileReferenceDownload fileReferenceDownload) {
- if (!alreadyDownloaded(fileReferenceDownload.fileReference())) {
- download(fileReferenceDownload);
- return true;
- } else {
- log.log(LogLevel.DEBUG, () -> "Download not needed, " + fileReferenceDownload.fileReference() + " already downloaded" );
- return false;
- }
+ /** Start a download, don't wait for result */
+ public void downloadIfNeeded(FileReferenceDownload fileReferenceDownload) {
+ FileReference fileReference = fileReferenceDownload.fileReference();
+ if (alreadyDownloaded(fileReference)) return;
+
+ download(fileReferenceDownload);
}
+ /** Download, the future returned will be complete()d by receiving method in {@link FileReceiver} */
private synchronized Future<Optional<File>> download(FileReferenceDownload fileReferenceDownload) {
FileReference fileReference = fileReferenceDownload.fileReference();
- Future<Optional<File>> inProgress = fileReferenceDownloader.addDownloadListener(fileReference, () -> getFile(fileReferenceDownload));
- if (inProgress != null) {
- log.log(LogLevel.DEBUG, () -> "Already downloading '" + fileReference.value() + "'");
- return inProgress;
- }
-
- Future<Optional<File>> future = queueForDownload(fileReferenceDownload);
- log.log(LogLevel.DEBUG, () -> "Queued '" + fileReference.value() + "' for download with timeout " + timeout);
- return future;
+ FileReferenceDownload inProgress = fileReferenceDownloader.getDownloadInProgress(fileReference);
+ return (inProgress == null) ? queueForDownload(fileReferenceDownload) : inProgress.future();
}
private Future<Optional<File>> queueForDownload(FileReferenceDownload fileReferenceDownload) {
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java
index 4a9fadf1a61..fe501484faf 100644
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java
@@ -2,16 +2,16 @@
package com.yahoo.vespa.filedistribution;
-import com.google.common.util.concurrent.SettableFuture;
import com.yahoo.config.FileReference;
import java.io.File;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
public class FileReferenceDownload {
private final FileReference fileReference;
- private final SettableFuture<Optional<File>> future;
+ private final CompletableFuture<Optional<File>> future;
// If a config server wants to download from another config server (because it does not have the
// file itself) we set this flag to true to avoid an eternal loop
private final boolean downloadFromOtherSourceIfNotFound;
@@ -22,7 +22,7 @@ public class FileReferenceDownload {
public FileReferenceDownload(FileReference fileReference, boolean downloadFromOtherSourceIfNotFound) {
this.fileReference = fileReference;
- this.future = SettableFuture.create();
+ this.future = new CompletableFuture<>();
this.downloadFromOtherSourceIfNotFound = downloadFromOtherSourceIfNotFound;
}
@@ -30,7 +30,7 @@ public class FileReferenceDownload {
return fileReference;
}
- SettableFuture<Optional<File>> future() {
+ CompletableFuture<Optional<File>> future() {
return future;
}
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java
index 8a0212211e1..c4fe257c991 100644
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java
@@ -1,8 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.filedistribution;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.ListenableFuture;
import com.yahoo.concurrent.DaemonThreadFactory;
import com.yahoo.config.FileReference;
import com.yahoo.jrt.Int32Value;
@@ -18,6 +16,7 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -40,7 +39,9 @@ public class FileReferenceDownloader {
Executors.newFixedThreadPool(Math.max(8, Runtime.getRuntime().availableProcessors()),
new DaemonThreadFactory("filereference downloader"));
private final ConnectionPool connectionPool;
+ /* Ongoing downloads */
private final Map<FileReference, FileReferenceDownload> downloads = new LinkedHashMap<>();
+ /* Status for ongoing and finished downloads */
private final Map<FileReference, Double> downloadStatus = new HashMap<>(); // between 0 and 1
private final Duration downloadTimeout;
private final Duration sleepBetweenRetries;
@@ -71,7 +72,7 @@ public class FileReferenceDownloader {
}
if ( !downloadStarted) {
- fileReferenceDownload.future().setException(new RuntimeException("Failed getting file reference '" + fileReference.value() + "'"));
+ fileReferenceDownload.future().completeExceptionally(new RuntimeException("Failed getting file reference '" + fileReference.value() + "'"));
synchronized (downloads) {
downloads.remove(fileReference);
}
@@ -94,7 +95,7 @@ public class FileReferenceDownloader {
if (download != null) {
downloadStatus.put(fileReference, 1.0);
downloads.remove(fileReference);
- download.future().set(Optional.of(file));
+ download.future().complete(Optional.of(file));
} else {
log.log(LogLevel.DEBUG, () -> "Received '" + fileReference + "', which was not requested. Can be ignored if happening during upgrades/restarts");
}
@@ -141,15 +142,10 @@ public class FileReferenceDownloader {
}
}
- ListenableFuture<Optional<File>> addDownloadListener(FileReference fileReference, Runnable runnable) {
+ FileReferenceDownload getDownloadInProgress(FileReference fileReference) {
synchronized (downloads) {
- FileReferenceDownload download = downloads.get(fileReference);
- if (download != null) {
- download.future().addListener(runnable, downloadExecutor);
- return download.future();
- }
+ return downloads.get(fileReference);
}
- return null;
}
private void execute(Request request, Connection connection) {
@@ -187,7 +183,7 @@ public class FileReferenceDownloader {
Map<FileReference, Double> downloadStatus() {
synchronized (downloads) {
- return ImmutableMap.copyOf(downloadStatus);
+ return Map.copyOf(downloadStatus);
}
}
diff --git a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java
index d1d12cb07b7..52d8507acea 100644
--- a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java
+++ b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java
@@ -27,6 +27,10 @@ import java.nio.file.Path;
import java.time.Duration;
import java.util.Arrays;
import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import static com.yahoo.jrt.ErrorCode.CONNECTION;
import static org.junit.Assert.assertEquals;
@@ -35,6 +39,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class FileDownloaderTest {
+ private static final Duration sleepBetweenRetries = Duration.ofMillis(10);
private MockConnection connection;
private FileDownloader fileDownloader;
@@ -47,7 +52,7 @@ public class FileDownloaderTest {
downloadDir = Files.createTempDirectory("filedistribution").toFile();
tempDir = Files.createTempDirectory("download").toFile();
connection = new MockConnection();
- fileDownloader = new FileDownloader(connection, downloadDir, tempDir, Duration.ofSeconds(2), Duration.ofMillis(100));
+ fileDownloader = new FileDownloader(connection, downloadDir, tempDir, Duration.ofSeconds(1), sleepBetweenRetries);
} catch (IOException e) {
e.printStackTrace();
fail(e.getMessage());
@@ -108,7 +113,7 @@ public class FileDownloaderTest {
// Receives fileReference, should return and make it available to caller
String filename = "abc.jar";
- receiveFile(fileReference, filename, FileReferenceData.Type.file, "some other content");
+ receiveFile(fileDownloader, fileReference, filename, FileReferenceData.Type.file, "some other content");
Optional<File> downloadedFile = fileDownloader.getFile(fileReference);
assertTrue(downloadedFile.isPresent());
@@ -142,7 +147,7 @@ public class FileDownloaderTest {
File tarFile = CompressedFileReference.compress(tempPath.toFile(), Arrays.asList(fooFile, barFile), new File(tempPath.toFile(), filename));
byte[] tarredContent = IOUtils.readFileBytes(tarFile);
- receiveFile(fileReference, filename, FileReferenceData.Type.compressed, tarredContent);
+ receiveFile(fileDownloader, fileReference, filename, FileReferenceData.Type.compressed, tarredContent);
Optional<File> downloadedFile = fileDownloader.getFile(fileReference);
assertTrue(downloadedFile.isPresent());
@@ -158,7 +163,7 @@ public class FileDownloaderTest {
@Test
public void getFileWhenConnectionError() throws IOException {
- fileDownloader = new FileDownloader(connection, downloadDir, tempDir, Duration.ofSeconds(3), Duration.ofMillis(100));
+ fileDownloader = new FileDownloader(connection, downloadDir, tempDir, Duration.ofSeconds(1), sleepBetweenRetries);
File downloadDir = fileDownloader.downloadDirectory();
int timesToFail = 2;
@@ -175,7 +180,7 @@ public class FileDownloaderTest {
// Receives fileReference, should return and make it available to caller
String filename = "abc.jar";
- receiveFile(fileReference, filename, FileReferenceData.Type.file, "some other content");
+ receiveFile(fileDownloader, fileReference, filename, FileReferenceData.Type.file, "some other content");
Optional<File> downloadedFile = fileDownloader.getFile(fileReference);
assertTrue(downloadedFile.isPresent());
File downloadedFileFullPath = new File(fileReferenceFullPath, filename);
@@ -189,26 +194,68 @@ public class FileDownloaderTest {
}
@Test
+ public void getFileWhenDownloadInProgress() throws IOException, ExecutionException, InterruptedException {
+ ExecutorService executor = Executors.newFixedThreadPool(2);
+ String filename = "abc.jar";
+ fileDownloader = new FileDownloader(connection, downloadDir, tempDir, Duration.ofSeconds(3), sleepBetweenRetries);
+ File downloadDir = fileDownloader.downloadDirectory();
+
+ // Delay response so that we can make a second request while downloading the file from the first request
+ connection.setResponseHandler(new MockConnection.WaitResponseHandler(Duration.ofSeconds(1)));
+
+ FileReference fileReference = new FileReference("fileReference");
+ File fileReferenceFullPath = fileReferenceFullPath(downloadDir, fileReference);
+ FileReferenceDownload fileReferenceDownload = new FileReferenceDownload(fileReference);
+
+ Future<Future<Optional<File>>> future1 = executor.submit(() -> fileDownloader.getFutureFile(fileReferenceDownload));
+ do {
+ Thread.sleep(10);
+ } while (! fileDownloader.fileReferenceDownloader().isDownloading(fileReference));
+ assertTrue(fileDownloader.fileReferenceDownloader().isDownloading(fileReference));
+
+ // Request file while download is in progress
+ Future<Future<Optional<File>>> future2 = executor.submit(() -> fileDownloader.getFutureFile(fileReferenceDownload));
+
+ // Receive file, will complete downloading and futures
+ receiveFile(fileDownloader, fileReference, filename, FileReferenceData.Type.file, "some other content");
+
+ // Check that we got file correctly with first request
+ Optional<File> downloadedFile = future1.get().get();
+ assertTrue(downloadedFile.isPresent());
+ File downloadedFileFullPath = new File(fileReferenceFullPath, filename);
+ assertEquals(downloadedFileFullPath.getAbsolutePath(), downloadedFile.get().getAbsolutePath());
+ assertEquals("some other content", IOUtils.readFile(downloadedFile.get()));
+
+ // Check that request done while downloading works
+ downloadedFile = future2.get().get();
+ assertTrue(downloadedFile.isPresent());
+ executor.shutdownNow();
+ }
+
+ @Test
public void setFilesToDownload() throws IOException {
Duration timeout = Duration.ofMillis(200);
- Duration sleepBetweenRetries = Duration.ofMillis(200);
MockConnection connectionPool = new MockConnection();
connectionPool.setResponseHandler(new MockConnection.WaitResponseHandler(timeout.plus(Duration.ofMillis(1000))));
FileDownloader fileDownloader = new FileDownloader(connectionPool, downloadDir, tempDir, timeout, sleepBetweenRetries);
FileReference foo = new FileReference("foo");
// Should download since we do not have the file on disk
- assertTrue(fileDownloader.downloadIfNeeded(new FileReferenceDownload(foo)));
+ fileDownloader.downloadIfNeeded(new FileReferenceDownload(foo));
+ assertTrue(fileDownloader.fileReferenceDownloader().isDownloading(foo));
+ assertFalse(fileDownloader.getFile(foo).isPresent());
// Receive files to simulate download
receiveFile();
// Should not download, since file has already been downloaded
- assertFalse(fileDownloader.downloadIfNeeded(new FileReferenceDownload(foo)));
+ fileDownloader.downloadIfNeeded(new FileReferenceDownload(foo));
+ // and file should be available
+ assertTrue(fileDownloader.getFile(foo).isPresent());
}
@Test
public void receiveFile() throws IOException {
FileReference foo = new FileReference("foo");
String filename = "foo.jar";
- receiveFile(foo, filename, FileReferenceData.Type.file, "content");
+ receiveFile(fileDownloader, foo, filename, FileReferenceData.Type.file, "content");
File downloadedFile = new File(fileReferenceFullPath(downloadDir, foo), filename);
assertEquals("content", IOUtils.readFile(downloadedFile));
}
@@ -229,16 +276,19 @@ public class FileDownloaderTest {
assertEquals(expectedDownloadStatus, downloadStatus, 0.0001);
}
- private void receiveFile(FileReference fileReference, String filename, FileReferenceData.Type type, String content) {
- receiveFile(fileReference, filename, type, Utf8.toBytes(content));
+ private void receiveFile(FileDownloader fileDownloader, FileReference fileReference, String filename,
+ FileReferenceData.Type type, String content) {
+ receiveFile(fileDownloader, fileReference, filename, type, Utf8.toBytes(content));
}
- private void receiveFile(FileReference fileReference, String filename, FileReferenceData.Type type, byte[] content) {
+ private void receiveFile(FileDownloader fileDownloader, FileReference fileReference, String filename,
+ FileReferenceData.Type type, byte[] content) {
XXHash64 hasher = XXHashFactory.fastestInstance().hash64();
FileReceiver.Session session =
new FileReceiver.Session(downloadDir, tempDir, 1, fileReference, type, filename, content.length);
session.addPart(0, content);
- session.close(hasher.hash(ByteBuffer.wrap(content), 0));
+ File file = session.close(hasher.hash(ByteBuffer.wrap(content), 0));
+ fileDownloader.fileReferenceDownloader().completedDownloading(fileReference, file);
}
private static class MockConnection implements ConnectionPool, com.yahoo.vespa.config.Connection {