aboutsummaryrefslogtreecommitdiffstats
path: root/filedistribution/src/main/java/com/yahoo/vespa
diff options
context:
space:
mode:
Diffstat (limited to 'filedistribution/src/main/java/com/yahoo/vespa')
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java24
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java2
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java35
3 files changed, 29 insertions, 32 deletions
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java
index 1d638a427f9..0678509dc68 100644
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java
@@ -11,7 +11,6 @@ import com.yahoo.yolean.Exceptions;
import java.io.File;
import java.time.Duration;
import java.util.Map;
-import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -37,6 +36,7 @@ public class FileDownloader implements AutoCloseable {
private final Supervisor supervisor;
private final File downloadDirectory;
private final Duration timeout;
+ private final Duration sleepBetweenRetries;
private final FileReferenceDownloader fileReferenceDownloader;
private final Downloads downloads = new Downloads();
@@ -61,7 +61,8 @@ public class FileDownloader implements AutoCloseable {
this.supervisor = supervisor;
this.downloadDirectory = downloadDirectory;
this.timeout = timeout;
- // Needed to receive RPC receiveFile* calls from server after asking for files
+ this.sleepBetweenRetries = sleepBetweenRetries;
+ // Needed to receive RPC receiveFile* calls from server after starting download of file reference
new FileReceiver(supervisor, downloads, downloadDirectory);
this.fileReferenceDownloader = new FileReferenceDownloader(connectionPool, downloads, timeout, sleepBetweenRetries);
}
@@ -83,12 +84,11 @@ public class FileDownloader implements AutoCloseable {
Future<Optional<File>> getFutureFile(FileReferenceDownload fileReferenceDownload) {
FileReference fileReference = fileReferenceDownload.fileReference();
- Objects.requireNonNull(fileReference, "file reference cannot be null");
Optional<File> file = getFileFromFileSystem(fileReference);
return (file.isPresent())
? CompletableFuture.completedFuture(file)
- : download(fileReferenceDownload);
+ : startDownload(fileReferenceDownload);
}
public Map<FileReference, Double> downloadStatus() { return downloads.downloadStatus(); }
@@ -119,18 +119,24 @@ public class FileDownloader implements AutoCloseable {
}
}
+ boolean fileReferenceExists(FileReference fileReference) {
+ return getFileFromFileSystem(fileReference).isPresent();
+ }
+
boolean isDownloading(FileReference fileReference) {
return downloads.get(fileReference).isPresent();
}
- /** Start a download, don't wait for result */
+ /** Start a download if needed, don't wait for result */
public void downloadIfNeeded(FileReferenceDownload fileReferenceDownload) {
- getFutureFile(fileReferenceDownload);
+ if (fileReferenceExists(fileReferenceDownload.fileReference())) return;
+
+ startDownload(fileReferenceDownload);
}
- /** Download, the future returned will be complete()d by receiving method in {@link FileReceiver} */
- private synchronized Future<Optional<File>> download(FileReferenceDownload fileReferenceDownload) {
- return fileReferenceDownloader.download(fileReferenceDownload);
+ /** Start downloading, the future returned will be complete()d by receiving method in {@link FileReceiver} */
+ private synchronized Future<Optional<File>> startDownload(FileReferenceDownload fileReferenceDownload) {
+ return fileReferenceDownloader.startDownload(fileReferenceDownload);
}
public void close() {
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java
index 470d94ce749..21e35bf67af 100644
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java
@@ -5,6 +5,7 @@ package com.yahoo.vespa.filedistribution;
import com.yahoo.config.FileReference;
import java.io.File;
+import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@@ -22,6 +23,7 @@ public class FileReferenceDownload {
}
public FileReferenceDownload(FileReference fileReference, boolean downloadFromOtherSourceIfNotFound, String client) {
+ Objects.requireNonNull(fileReference, "file reference cannot be null");
this.fileReference = fileReference;
this.future = new CompletableFuture<>();
this.downloadFromOtherSourceIfNotFound = downloadFromOtherSourceIfNotFound;
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java
index 952684b7b0b..740bf23796f 100644
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java
@@ -11,7 +11,6 @@ import com.yahoo.vespa.config.ConnectionPool;
import java.io.File;
import java.time.Duration;
-import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -50,39 +49,29 @@ public class FileReferenceDownloader {
this.rpcTimeout = Duration.ofSeconds(timeoutString == null ? 30 : Integer.parseInt(timeoutString));
}
- private void startDownload(FileReferenceDownload fileReferenceDownload) {
+ private void waitUntilDownloadStarted(FileReferenceDownload fileReferenceDownload) {
FileReference fileReference = fileReferenceDownload.fileReference();
- Instant end = Instant.now().plus(downloadTimeout);
- boolean downloadStarted = false;
int retryCount = 0;
do {
- try {
- if (startDownloadRpc(fileReferenceDownload, retryCount)) {
- downloadStarted = true;
- } else {
- retryCount++;
- long sleepTime = Math.min(sleepBetweenRetries.toMillis() * retryCount,
- Math.max(0, Duration.between(Instant.now(), end).toMillis()));
- Thread.sleep(sleepTime);
- }
- }
- catch (InterruptedException e) { /* ignored */}
- } while (Instant.now().isBefore(end) && !downloadStarted);
+ if (startDownloadRpc(fileReferenceDownload, retryCount))
+ return;
- if ( !downloadStarted) {
- fileReferenceDownload.future().completeExceptionally(new RuntimeException("Failed getting file reference '" + fileReference.value() + "'"));
- downloads.remove(fileReference);
- }
+ try { Thread.sleep(sleepBetweenRetries.toMillis()); } catch (InterruptedException e) { /* ignored */}
+ retryCount++;
+ } while (retryCount < 5);
+
+ fileReferenceDownload.future().completeExceptionally(new RuntimeException("Failed getting " + fileReference));
+ downloads.remove(fileReference);
}
- Future<Optional<File>> download(FileReferenceDownload fileReferenceDownload) {
+ Future<Optional<File>> startDownload(FileReferenceDownload fileReferenceDownload) {
FileReference fileReference = fileReferenceDownload.fileReference();
Optional<FileReferenceDownload> inProgress = downloads.get(fileReference);
if (inProgress.isPresent()) return inProgress.get().future();
log.log(Level.FINE, () -> "Will download file reference '" + fileReference.value() + "' with timeout " + downloadTimeout);
downloads.add(fileReferenceDownload);
- downloadExecutor.submit(() -> startDownload(fileReferenceDownload));
+ downloadExecutor.submit(() -> waitUntilDownloadStarted(fileReferenceDownload));
return fileReferenceDownload.future();
}
@@ -99,7 +88,7 @@ public class FileReferenceDownloader {
double timeoutSecs = (double) rpcTimeout.getSeconds();
timeoutSecs += retryCount * 10.0;
connection.invokeSync(request, timeoutSecs);
- Level logLevel = (retryCount > 5 ? Level.INFO : Level.FINE);
+ Level logLevel = (retryCount > 3 ? Level.INFO : Level.FINE);
if (validateResponse(request)) {
log.log(Level.FINE, () -> "Request callback, OK. Req: " + request + "\nSpec: " + connection + ", retry count " + retryCount);
if (request.returnValues().get(0).asInt32() == 0) {