summaryrefslogtreecommitdiffstats
path: root/filedistribution
diff options
context:
space:
mode:
authorHarald Musum <musum@verizonmedia.com>2020-10-01 16:15:31 +0200
committerHarald Musum <musum@verizonmedia.com>2020-10-01 16:15:31 +0200
commite93dec4415091999a61903e7e14c6092f4608c92 (patch)
treedb5030d6f901fdcc2dfa55cde11d6a8a93b5698e /filedistribution
parenta8da8f0344d98e0a1218afd4fd9fe9ddc02f0661 (diff)
Create classes for keeping track of downloads and download statuses
Diffstat (limited to 'filedistribution')
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java164
1 files changed, 117 insertions, 47 deletions
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java
index 815fdb194cf..13c089a1093 100644
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java
@@ -6,23 +6,24 @@ import com.yahoo.config.FileReference;
import com.yahoo.jrt.Int32Value;
import com.yahoo.jrt.Request;
import com.yahoo.jrt.StringValue;
-
-import java.util.concurrent.Future;
-import java.util.logging.Level;
import com.yahoo.vespa.config.Connection;
import com.yahoo.vespa.config.ConnectionPool;
import java.io.File;
import java.time.Duration;
import java.time.Instant;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
import java.util.logging.Logger;
+import java.util.stream.Collectors;
/**
* Downloads file reference using rpc requests to config server and keeps track of files being downloaded
@@ -41,9 +42,9 @@ public class FileReferenceDownloader {
new DaemonThreadFactory("filereference downloader"));
private final ConnectionPool connectionPool;
/* Ongoing downloads */
- private final Map<FileReference, FileReferenceDownload> downloads = new LinkedHashMap<>();
+ private final Downloads downloads = new Downloads();
/* Status for ongoing and finished downloads */
- private final Map<FileReference, Double> downloadStatus = new HashMap<>(); // between 0 and 1
+ private final DownloadStatuses downloadStatuses = new DownloadStatuses();
private final Duration downloadTimeout;
private final Duration sleepBetweenRetries;
@@ -74,44 +75,36 @@ public class FileReferenceDownloader {
if ( !downloadStarted) {
fileReferenceDownload.future().completeExceptionally(new RuntimeException("Failed getting file reference '" + fileReference.value() + "'"));
- synchronized (downloads) {
- downloads.remove(fileReference);
- }
+ downloads.remove(fileReference);
}
}
Future<Optional<File>> download(FileReferenceDownload fileReferenceDownload) {
- synchronized (downloads) {
- FileReference fileReference = fileReferenceDownload.fileReference();
- FileReferenceDownload inProgress = downloads.get(fileReference);
- if (inProgress != null) return inProgress.future();
+ FileReference fileReference = fileReferenceDownload.fileReference();
+ Optional<FileReferenceDownload> inProgress = downloads.get(fileReference);
+ if (inProgress.isPresent()) return inProgress.get().future();
- log.log(Level.FINE, () -> "Will download file reference '" + fileReference.value() + "' with timeout " + downloadTimeout);
- downloads.put(fileReference, fileReferenceDownload);
- downloadStatus.put(fileReference, 0.0);
- downloadExecutor.submit(() -> startDownload(fileReferenceDownload));
- return fileReferenceDownload.future();
- }
+ log.log(Level.FINE, () -> "Will download file reference '" + fileReference.value() + "' with timeout " + downloadTimeout);
+ downloads.add(fileReferenceDownload);
+ downloadStatuses.add(fileReference);
+ downloadExecutor.submit(() -> startDownload(fileReferenceDownload));
+ return fileReferenceDownload.future();
}
void completedDownloading(FileReference fileReference, File file) {
- synchronized (downloads) {
- FileReferenceDownload download = downloads.get(fileReference);
- if (download != null) {
- downloadStatus.put(fileReference, 1.0);
- downloads.remove(fileReference);
- download.future().complete(Optional.of(file));
- } else {
- log.log(Level.FINE, () -> "Received '" + fileReference + "', which was not requested. Can be ignored if happening during upgrades/restarts");
- }
+ Optional<FileReferenceDownload> download = downloads.get(fileReference);
+ if (download.isPresent()) {
+ downloadStatuses.get(fileReference).ifPresent(DownloadStatus::finished);
+ downloads.remove(fileReference);
+ download.get().future().complete(Optional.of(file));
+ } else {
+ log.log(Level.FINE, () -> "Received '" + fileReference + "', which was not requested. Can be ignored if happening during upgrades/restarts");
}
}
void failedDownloading(FileReference fileReference) {
- synchronized (downloads) {
- downloadStatus.put(fileReference, 0.0);
- downloads.remove(fileReference);
- }
+ downloadStatuses.get(fileReference).ifPresent(d -> d.setProgress(0.0));
+ downloads.remove(fileReference);
}
private boolean startDownloadRpc(FileReferenceDownload fileReferenceDownload, int retryCount) {
@@ -143,9 +136,7 @@ public class FileReferenceDownloader {
}
boolean isDownloading(FileReference fileReference) {
- synchronized (downloads) {
- return downloads.containsKey(fileReference);
- }
+ return downloads.get(fileReference).isPresent();
}
private boolean validateResponse(Request request) {
@@ -162,25 +153,23 @@ public class FileReferenceDownloader {
double downloadStatus(String file) {
double status = 0.0;
- synchronized (downloads) {
- Double download = downloadStatus.get(new FileReference(file));
- if (download != null) {
- status = download;
- }
+ Optional<DownloadStatus> downloadStatus = downloadStatuses.get(new FileReference(file));
+ if (downloadStatus.isPresent()) {
+ status = downloadStatus.get().progress();
}
return status;
}
void setDownloadStatus(FileReference fileReference, double completeness) {
- synchronized (downloads) {
- downloadStatus.put(fileReference, completeness);
- }
+ Optional<DownloadStatus> downloadStatus = downloadStatuses.get(fileReference);
+ if (downloadStatus.isPresent())
+ downloadStatus.get().setProgress(completeness);
+ else
+ downloadStatuses.add(fileReference, completeness);
}
Map<FileReference, Double> downloadStatus() {
- synchronized (downloads) {
- return Map.copyOf(downloadStatus);
- }
+ return downloadStatuses.all().values().stream().collect(Collectors.toMap(DownloadStatus::fileReference, DownloadStatus::progress));
}
public ConnectionPool connectionPool() {
@@ -194,4 +183,85 @@ public class FileReferenceDownloader {
Thread.interrupted(); // Ignore and continue shutdown.
}
}
+
+ private static class Downloads {
+ private final Map<FileReference, FileReferenceDownload> downloads = new ConcurrentHashMap<>();
+
+ void add(FileReferenceDownload fileReferenceDownload) {
+ downloads.put(fileReferenceDownload.fileReference(), fileReferenceDownload);
+ }
+
+ void remove(FileReference fileReference) {
+ downloads.remove(fileReference);
+ }
+
+ Optional<FileReferenceDownload> get(FileReference fileReference) {
+ return Optional.ofNullable(downloads.get(fileReference));
+ }
+ }
+
+ private static class DownloadStatus {
+ private final FileReference fileReference;
+ private double progress; // between 0 and 1
+ private final Instant created;
+
+ DownloadStatus(FileReference fileReference) {
+ this.fileReference = fileReference;
+ this.progress = 0.0;
+ this.created = Instant.now();
+ }
+
+ public FileReference fileReference() {
+ return fileReference;
+ }
+
+ public double progress() {
+ return progress;
+ }
+
+ public void setProgress(double progress) {
+ this.progress = progress;
+ }
+
+ public void finished() {
+ setProgress(1.0);
+ }
+
+ public Instant created() {
+ return created;
+ }
+ }
+
+ /* Status for ongoing and completed downloads, keeps at most status for 100 last downloads */
+ private static class DownloadStatuses {
+
+ private static final int maxEntries = 100;
+
+ private final Map<FileReference, DownloadStatus> downloadStatus = new ConcurrentHashMap<>();
+
+ void add(FileReference fileReference) {
+ add(fileReference, 0.0);
+ }
+
+ void add(FileReference fileReference, double progress) {
+ DownloadStatus ds = new DownloadStatus(fileReference);
+ ds.setProgress(progress);
+ downloadStatus.put(fileReference, ds);
+ if (downloadStatus.size() > maxEntries) {
+ Map.Entry<FileReference, DownloadStatus> oldest =
+ Collections.min(downloadStatus.entrySet(), Comparator.comparing(e -> e.getValue().created));
+ downloadStatus.remove(oldest.getKey());
+ }
+ }
+
+ Optional<DownloadStatus> get(FileReference fileReference) {
+ return Optional.ofNullable(downloadStatus.get(fileReference));
+ }
+
+ Map<FileReference, DownloadStatus> all() {
+ return Map.copyOf(downloadStatus);
+ }
+
+ }
+
}