diff options
4 files changed, 82 insertions, 45 deletions
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java index 9b9867326d7..5829ab37b77 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java @@ -1,7 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.filedistribution; -import com.google.common.util.concurrent.SettableFuture; import com.yahoo.config.FileReference; import com.yahoo.log.LogLevel; import com.yahoo.vespa.config.ConnectionPool; @@ -13,6 +12,7 @@ import java.time.Duration; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -57,18 +57,14 @@ public class FileDownloader { } } - private Future<Optional<File>> getFutureFile(FileReferenceDownload fileReferenceDownload) { + Future<Optional<File>> getFutureFile(FileReferenceDownload fileReferenceDownload) { FileReference fileReference = fileReferenceDownload.fileReference(); Objects.requireNonNull(fileReference, "file reference cannot be null"); - Optional<File> file = getFileFromFileSystem(fileReference, downloadDirectory); - if (file.isPresent()) { - SettableFuture<Optional<File>> future = SettableFuture.create(); - future.set(file); - return future; - } else { - return download(fileReferenceDownload); - } + Optional<File> file = getFileFromFileSystem(fileReference); + return (file.isPresent()) + ? CompletableFuture.completedFuture(file) + : download(fileReferenceDownload); } double downloadStatus(FileReference fileReference) { @@ -84,9 +80,9 @@ public class FileDownloader { } // Files are moved atomically, so if file reference exists and is accessible we can use it - private Optional<File> getFileFromFileSystem(FileReference fileReference, File directory) { - File[] files = new File(directory, fileReference.value()).listFiles(); - if (directory.exists() && directory.isDirectory() && files != null && files.length > 0) { + private Optional<File> getFileFromFileSystem(FileReference fileReference) { + File[] files = new File(downloadDirectory, fileReference.value()).listFiles(); + if (downloadDirectory.exists() && downloadDirectory.isDirectory() && files != null && files.length > 0) { File file = files[0]; if (!file.exists()) { throw new RuntimeException("File reference '" + fileReference.value() + "' does not exist"); @@ -103,7 +99,7 @@ public class FileDownloader { private boolean alreadyDownloaded(FileReference fileReference) { try { - return (getFileFromFileSystem(fileReference, downloadDirectory).isPresent()); + return (getFileFromFileSystem(fileReference).isPresent()); } catch (RuntimeException e) { return false; } @@ -112,15 +108,16 @@ public class FileDownloader { /** Start a download, don't wait for result */ public void downloadIfNeeded(FileReferenceDownload fileReferenceDownload) { FileReference fileReference = fileReferenceDownload.fileReference(); - if (alreadyDownloaded(fileReference) || fileReferenceDownloader.isDownloading(fileReference)) return; + if (alreadyDownloaded(fileReference)) return; - queueForDownload(fileReferenceDownload); + download(fileReferenceDownload); } + /** Download, the future returned will be complete()d by receiving method in {@link FileReceiver} */ private synchronized Future<Optional<File>> download(FileReferenceDownload fileReferenceDownload) { FileReference fileReference = fileReferenceDownload.fileReference(); - Future<Optional<File>> inProgress = fileReferenceDownloader.addDownloadListener(fileReference, () -> getFile(fileReferenceDownload)); - return (inProgress == null) ? queueForDownload(fileReferenceDownload) : inProgress; + FileReferenceDownload inProgress = fileReferenceDownloader.getDownloadInProgress(fileReference); + return (inProgress == null) ? queueForDownload(fileReferenceDownload) : inProgress.future(); } private Future<Optional<File>> queueForDownload(FileReferenceDownload fileReferenceDownload) { diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java index 4a9fadf1a61..fe501484faf 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java @@ -2,16 +2,16 @@ package com.yahoo.vespa.filedistribution; -import com.google.common.util.concurrent.SettableFuture; import com.yahoo.config.FileReference; import java.io.File; import java.util.Optional; +import java.util.concurrent.CompletableFuture; public class FileReferenceDownload { private final FileReference fileReference; - private final SettableFuture<Optional<File>> future; + private final CompletableFuture<Optional<File>> future; // If a config server wants to download from another config server (because it does not have the // file itself) we set this flag to true to avoid an eternal loop private final boolean downloadFromOtherSourceIfNotFound; @@ -22,7 +22,7 @@ public class FileReferenceDownload { public FileReferenceDownload(FileReference fileReference, boolean downloadFromOtherSourceIfNotFound) { this.fileReference = fileReference; - this.future = SettableFuture.create(); + this.future = new CompletableFuture<>(); this.downloadFromOtherSourceIfNotFound = downloadFromOtherSourceIfNotFound; } @@ -30,7 +30,7 @@ public class FileReferenceDownload { return fileReference; } - SettableFuture<Optional<File>> future() { + CompletableFuture<Optional<File>> future() { return future; } diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java index ae0040cc7dd..c4fe257c991 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java @@ -1,8 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.filedistribution; -import com.google.common.collect.ImmutableMap; -import com.google.common.util.concurrent.ListenableFuture; import com.yahoo.concurrent.DaemonThreadFactory; import com.yahoo.config.FileReference; import com.yahoo.jrt.Int32Value; @@ -18,6 +16,7 @@ import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -73,7 +72,7 @@ public class FileReferenceDownloader { } if ( !downloadStarted) { - fileReferenceDownload.future().setException(new RuntimeException("Failed getting file reference '" + fileReference.value() + "'")); + fileReferenceDownload.future().completeExceptionally(new RuntimeException("Failed getting file reference '" + fileReference.value() + "'")); synchronized (downloads) { downloads.remove(fileReference); } @@ -96,7 +95,7 @@ public class FileReferenceDownloader { if (download != null) { downloadStatus.put(fileReference, 1.0); downloads.remove(fileReference); - download.future().set(Optional.of(file)); + download.future().complete(Optional.of(file)); } else { log.log(LogLevel.DEBUG, () -> "Received '" + fileReference + "', which was not requested. Can be ignored if happening during upgrades/restarts"); } @@ -143,15 +142,10 @@ public class FileReferenceDownloader { } } - ListenableFuture<Optional<File>> addDownloadListener(FileReference fileReference, Runnable runnable) { + FileReferenceDownload getDownloadInProgress(FileReference fileReference) { synchronized (downloads) { - FileReferenceDownload download = downloads.get(fileReference); - if (download != null) { - download.future().addListener(runnable, downloadExecutor); - return download.future(); - } + return downloads.get(fileReference); } - return null; } private void execute(Request request, Connection connection) { @@ -189,7 +183,7 @@ public class FileReferenceDownloader { Map<FileReference, Double> downloadStatus() { synchronized (downloads) { - return ImmutableMap.copyOf(downloadStatus); + return Map.copyOf(downloadStatus); } } diff --git a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java index 92f5cd51d87..52d8507acea 100644 --- a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java +++ b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java @@ -27,6 +27,10 @@ import java.nio.file.Path; import java.time.Duration; import java.util.Arrays; import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import static com.yahoo.jrt.ErrorCode.CONNECTION; import static org.junit.Assert.assertEquals; @@ -35,6 +39,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; public class FileDownloaderTest { + private static final Duration sleepBetweenRetries = Duration.ofMillis(10); private MockConnection connection; private FileDownloader fileDownloader; @@ -47,7 +52,7 @@ public class FileDownloaderTest { downloadDir = Files.createTempDirectory("filedistribution").toFile(); tempDir = Files.createTempDirectory("download").toFile(); connection = new MockConnection(); - fileDownloader = new FileDownloader(connection, downloadDir, tempDir, Duration.ofSeconds(2), Duration.ofMillis(100)); + fileDownloader = new FileDownloader(connection, downloadDir, tempDir, Duration.ofSeconds(1), sleepBetweenRetries); } catch (IOException e) { e.printStackTrace(); fail(e.getMessage()); @@ -108,7 +113,7 @@ public class FileDownloaderTest { // Receives fileReference, should return and make it available to caller String filename = "abc.jar"; - receiveFile(fileReference, filename, FileReferenceData.Type.file, "some other content"); + receiveFile(fileDownloader, fileReference, filename, FileReferenceData.Type.file, "some other content"); Optional<File> downloadedFile = fileDownloader.getFile(fileReference); assertTrue(downloadedFile.isPresent()); @@ -142,7 +147,7 @@ public class FileDownloaderTest { File tarFile = CompressedFileReference.compress(tempPath.toFile(), Arrays.asList(fooFile, barFile), new File(tempPath.toFile(), filename)); byte[] tarredContent = IOUtils.readFileBytes(tarFile); - receiveFile(fileReference, filename, FileReferenceData.Type.compressed, tarredContent); + receiveFile(fileDownloader, fileReference, filename, FileReferenceData.Type.compressed, tarredContent); Optional<File> downloadedFile = fileDownloader.getFile(fileReference); assertTrue(downloadedFile.isPresent()); @@ -158,7 +163,7 @@ public class FileDownloaderTest { @Test public void getFileWhenConnectionError() throws IOException { - fileDownloader = new FileDownloader(connection, downloadDir, tempDir, Duration.ofSeconds(3), Duration.ofMillis(100)); + fileDownloader = new FileDownloader(connection, downloadDir, tempDir, Duration.ofSeconds(1), sleepBetweenRetries); File downloadDir = fileDownloader.downloadDirectory(); int timesToFail = 2; @@ -175,7 +180,7 @@ public class FileDownloaderTest { // Receives fileReference, should return and make it available to caller String filename = "abc.jar"; - receiveFile(fileReference, filename, FileReferenceData.Type.file, "some other content"); + receiveFile(fileDownloader, fileReference, filename, FileReferenceData.Type.file, "some other content"); Optional<File> downloadedFile = fileDownloader.getFile(fileReference); assertTrue(downloadedFile.isPresent()); File downloadedFileFullPath = new File(fileReferenceFullPath, filename); @@ -189,9 +194,47 @@ public class FileDownloaderTest { } @Test + public void getFileWhenDownloadInProgress() throws IOException, ExecutionException, InterruptedException { + ExecutorService executor = Executors.newFixedThreadPool(2); + String filename = "abc.jar"; + fileDownloader = new FileDownloader(connection, downloadDir, tempDir, Duration.ofSeconds(3), sleepBetweenRetries); + File downloadDir = fileDownloader.downloadDirectory(); + + // Delay response so that we can make a second request while downloading the file from the first request + connection.setResponseHandler(new MockConnection.WaitResponseHandler(Duration.ofSeconds(1))); + + FileReference fileReference = new FileReference("fileReference"); + File fileReferenceFullPath = fileReferenceFullPath(downloadDir, fileReference); + FileReferenceDownload fileReferenceDownload = new FileReferenceDownload(fileReference); + + Future<Future<Optional<File>>> future1 = executor.submit(() -> fileDownloader.getFutureFile(fileReferenceDownload)); + do { + Thread.sleep(10); + } while (! fileDownloader.fileReferenceDownloader().isDownloading(fileReference)); + assertTrue(fileDownloader.fileReferenceDownloader().isDownloading(fileReference)); + + // Request file while download is in progress + Future<Future<Optional<File>>> future2 = executor.submit(() -> fileDownloader.getFutureFile(fileReferenceDownload)); + + // Receive file, will complete downloading and futures + receiveFile(fileDownloader, fileReference, filename, FileReferenceData.Type.file, "some other content"); + + // Check that we got file correctly with first request + Optional<File> downloadedFile = future1.get().get(); + assertTrue(downloadedFile.isPresent()); + File downloadedFileFullPath = new File(fileReferenceFullPath, filename); + assertEquals(downloadedFileFullPath.getAbsolutePath(), downloadedFile.get().getAbsolutePath()); + assertEquals("some other content", IOUtils.readFile(downloadedFile.get())); + + // Check that request done while downloading works + downloadedFile = future2.get().get(); + assertTrue(downloadedFile.isPresent()); + executor.shutdownNow(); + } + + @Test public void setFilesToDownload() throws IOException { Duration timeout = Duration.ofMillis(200); - Duration sleepBetweenRetries = Duration.ofMillis(200); MockConnection connectionPool = new MockConnection(); connectionPool.setResponseHandler(new MockConnection.WaitResponseHandler(timeout.plus(Duration.ofMillis(1000)))); FileDownloader fileDownloader = new FileDownloader(connectionPool, downloadDir, tempDir, timeout, sleepBetweenRetries); @@ -212,7 +255,7 @@ public class FileDownloaderTest { public void receiveFile() throws IOException { FileReference foo = new FileReference("foo"); String filename = "foo.jar"; - receiveFile(foo, filename, FileReferenceData.Type.file, "content"); + receiveFile(fileDownloader, foo, filename, FileReferenceData.Type.file, "content"); File downloadedFile = new File(fileReferenceFullPath(downloadDir, foo), filename); assertEquals("content", IOUtils.readFile(downloadedFile)); } @@ -233,16 +276,19 @@ public class FileDownloaderTest { assertEquals(expectedDownloadStatus, downloadStatus, 0.0001); } - private void receiveFile(FileReference fileReference, String filename, FileReferenceData.Type type, String content) { - receiveFile(fileReference, filename, type, Utf8.toBytes(content)); + private void receiveFile(FileDownloader fileDownloader, FileReference fileReference, String filename, + FileReferenceData.Type type, String content) { + receiveFile(fileDownloader, fileReference, filename, type, Utf8.toBytes(content)); } - private void receiveFile(FileReference fileReference, String filename, FileReferenceData.Type type, byte[] content) { + private void receiveFile(FileDownloader fileDownloader, FileReference fileReference, String filename, + FileReferenceData.Type type, byte[] content) { XXHash64 hasher = XXHashFactory.fastestInstance().hash64(); FileReceiver.Session session = new FileReceiver.Session(downloadDir, tempDir, 1, fileReference, type, filename, content.length); session.addPart(0, content); - session.close(hasher.hash(ByteBuffer.wrap(content), 0)); + File file = session.close(hasher.hash(ByteBuffer.wrap(content), 0)); + fileDownloader.fileReferenceDownloader().completedDownloading(fileReference, file); } private static class MockConnection implements ConnectionPool, com.yahoo.vespa.config.Connection { |