diff options
Diffstat (limited to 'filedistribution/src/main/java/com/yahoo/vespa')
3 files changed, 29 insertions, 32 deletions
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java index 1d638a427f9..0678509dc68 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java @@ -11,7 +11,6 @@ import com.yahoo.yolean.Exceptions; import java.io.File; import java.time.Duration; import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -37,6 +36,7 @@ public class FileDownloader implements AutoCloseable { private final Supervisor supervisor; private final File downloadDirectory; private final Duration timeout; + private final Duration sleepBetweenRetries; private final FileReferenceDownloader fileReferenceDownloader; private final Downloads downloads = new Downloads(); @@ -61,7 +61,8 @@ public class FileDownloader implements AutoCloseable { this.supervisor = supervisor; this.downloadDirectory = downloadDirectory; this.timeout = timeout; - // Needed to receive RPC receiveFile* calls from server after asking for files + this.sleepBetweenRetries = sleepBetweenRetries; + // Needed to receive RPC receiveFile* calls from server after starting download of file reference new FileReceiver(supervisor, downloads, downloadDirectory); this.fileReferenceDownloader = new FileReferenceDownloader(connectionPool, downloads, timeout, sleepBetweenRetries); } @@ -83,12 +84,11 @@ public class FileDownloader implements AutoCloseable { Future<Optional<File>> getFutureFile(FileReferenceDownload fileReferenceDownload) { FileReference fileReference = fileReferenceDownload.fileReference(); - Objects.requireNonNull(fileReference, "file reference cannot be null"); Optional<File> file = getFileFromFileSystem(fileReference); return (file.isPresent()) ? CompletableFuture.completedFuture(file) - : download(fileReferenceDownload); + : startDownload(fileReferenceDownload); } public Map<FileReference, Double> downloadStatus() { return downloads.downloadStatus(); } @@ -119,18 +119,24 @@ public class FileDownloader implements AutoCloseable { } } + boolean fileReferenceExists(FileReference fileReference) { + return getFileFromFileSystem(fileReference).isPresent(); + } + boolean isDownloading(FileReference fileReference) { return downloads.get(fileReference).isPresent(); } - /** Start a download, don't wait for result */ + /** Start a download if needed, don't wait for result */ public void downloadIfNeeded(FileReferenceDownload fileReferenceDownload) { - getFutureFile(fileReferenceDownload); + if (fileReferenceExists(fileReferenceDownload.fileReference())) return; + + startDownload(fileReferenceDownload); } - /** Download, the future returned will be complete()d by receiving method in {@link FileReceiver} */ - private synchronized Future<Optional<File>> download(FileReferenceDownload fileReferenceDownload) { - return fileReferenceDownloader.download(fileReferenceDownload); + /** Start downloading, the future returned will be complete()d by receiving method in {@link FileReceiver} */ + private synchronized Future<Optional<File>> startDownload(FileReferenceDownload fileReferenceDownload) { + return fileReferenceDownloader.startDownload(fileReferenceDownload); } public void close() { diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java index 470d94ce749..21e35bf67af 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java @@ -5,6 +5,7 @@ package com.yahoo.vespa.filedistribution; import com.yahoo.config.FileReference; import java.io.File; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -22,6 +23,7 @@ public class FileReferenceDownload { } public FileReferenceDownload(FileReference fileReference, boolean downloadFromOtherSourceIfNotFound, String client) { + Objects.requireNonNull(fileReference, "file reference cannot be null"); this.fileReference = fileReference; this.future = new CompletableFuture<>(); this.downloadFromOtherSourceIfNotFound = downloadFromOtherSourceIfNotFound; diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java index 952684b7b0b..740bf23796f 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java @@ -11,7 +11,6 @@ import com.yahoo.vespa.config.ConnectionPool; import java.io.File; import java.time.Duration; -import java.time.Instant; import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -50,39 +49,29 @@ public class FileReferenceDownloader { this.rpcTimeout = Duration.ofSeconds(timeoutString == null ? 30 : Integer.parseInt(timeoutString)); } - private void startDownload(FileReferenceDownload fileReferenceDownload) { + private void waitUntilDownloadStarted(FileReferenceDownload fileReferenceDownload) { FileReference fileReference = fileReferenceDownload.fileReference(); - Instant end = Instant.now().plus(downloadTimeout); - boolean downloadStarted = false; int retryCount = 0; do { - try { - if (startDownloadRpc(fileReferenceDownload, retryCount)) { - downloadStarted = true; - } else { - retryCount++; - long sleepTime = Math.min(sleepBetweenRetries.toMillis() * retryCount, - Math.max(0, Duration.between(Instant.now(), end).toMillis())); - Thread.sleep(sleepTime); - } - } - catch (InterruptedException e) { /* ignored */} - } while (Instant.now().isBefore(end) && !downloadStarted); + if (startDownloadRpc(fileReferenceDownload, retryCount)) + return; - if ( !downloadStarted) { - fileReferenceDownload.future().completeExceptionally(new RuntimeException("Failed getting file reference '" + fileReference.value() + "'")); - downloads.remove(fileReference); - } + try { Thread.sleep(sleepBetweenRetries.toMillis()); } catch (InterruptedException e) { /* ignored */} + retryCount++; + } while (retryCount < 5); + + fileReferenceDownload.future().completeExceptionally(new RuntimeException("Failed getting " + fileReference)); + downloads.remove(fileReference); } - Future<Optional<File>> download(FileReferenceDownload fileReferenceDownload) { + Future<Optional<File>> startDownload(FileReferenceDownload fileReferenceDownload) { FileReference fileReference = fileReferenceDownload.fileReference(); Optional<FileReferenceDownload> inProgress = downloads.get(fileReference); if (inProgress.isPresent()) return inProgress.get().future(); log.log(Level.FINE, () -> "Will download file reference '" + fileReference.value() + "' with timeout " + downloadTimeout); downloads.add(fileReferenceDownload); - downloadExecutor.submit(() -> startDownload(fileReferenceDownload)); + downloadExecutor.submit(() -> waitUntilDownloadStarted(fileReferenceDownload)); return fileReferenceDownload.future(); } @@ -99,7 +88,7 @@ public class FileReferenceDownloader { double timeoutSecs = (double) rpcTimeout.getSeconds(); timeoutSecs += retryCount * 10.0; connection.invokeSync(request, timeoutSecs); - Level logLevel = (retryCount > 5 ? Level.INFO : Level.FINE); + Level logLevel = (retryCount > 3 ? Level.INFO : Level.FINE); if (validateResponse(request)) { log.log(Level.FINE, () -> "Request callback, OK. Req: " + request + "\nSpec: " + connection + ", retry count " + retryCount); if (request.returnValues().get(0).asInt32() == 0) { |