diff options
author | Harald Musum <musum@verizonmedia.com> | 2020-10-01 16:15:31 +0200 |
---|---|---|
committer | Harald Musum <musum@verizonmedia.com> | 2020-10-01 16:15:31 +0200 |
commit | e93dec4415091999a61903e7e14c6092f4608c92 (patch) | |
tree | db5030d6f901fdcc2dfa55cde11d6a8a93b5698e /filedistribution | |
parent | a8da8f0344d98e0a1218afd4fd9fe9ddc02f0661 (diff) |
Create classes for keeping track of downloads and download statuses
Diffstat (limited to 'filedistribution')
-rw-r--r-- | filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java | 164 |
1 files changed, 117 insertions, 47 deletions
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java index 815fdb194cf..13c089a1093 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java @@ -6,23 +6,24 @@ import com.yahoo.config.FileReference; import com.yahoo.jrt.Int32Value; import com.yahoo.jrt.Request; import com.yahoo.jrt.StringValue; - -import java.util.concurrent.Future; -import java.util.logging.Level; import com.yahoo.vespa.config.Connection; import com.yahoo.vespa.config.ConnectionPool; import java.io.File; import java.time.Duration; import java.time.Instant; -import java.util.HashMap; -import java.util.LinkedHashMap; +import java.util.Collections; +import java.util.Comparator; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.logging.Level; import java.util.logging.Logger; +import java.util.stream.Collectors; /** * Downloads file reference using rpc requests to config server and keeps track of files being downloaded @@ -41,9 +42,9 @@ public class FileReferenceDownloader { new DaemonThreadFactory("filereference downloader")); private final ConnectionPool connectionPool; /* Ongoing downloads */ - private final Map<FileReference, FileReferenceDownload> downloads = new LinkedHashMap<>(); + private final Downloads downloads = new Downloads(); /* Status for ongoing and finished downloads */ - private final Map<FileReference, Double> downloadStatus = new HashMap<>(); // between 0 and 1 + private final DownloadStatuses downloadStatuses = new DownloadStatuses(); private final Duration downloadTimeout; private final Duration sleepBetweenRetries; @@ -74,44 +75,36 @@ public class FileReferenceDownloader { if ( !downloadStarted) { fileReferenceDownload.future().completeExceptionally(new RuntimeException("Failed getting file reference '" + fileReference.value() + "'")); - synchronized (downloads) { - downloads.remove(fileReference); - } + downloads.remove(fileReference); } } Future<Optional<File>> download(FileReferenceDownload fileReferenceDownload) { - synchronized (downloads) { - FileReference fileReference = fileReferenceDownload.fileReference(); - FileReferenceDownload inProgress = downloads.get(fileReference); - if (inProgress != null) return inProgress.future(); + FileReference fileReference = fileReferenceDownload.fileReference(); + Optional<FileReferenceDownload> inProgress = downloads.get(fileReference); + if (inProgress.isPresent()) return inProgress.get().future(); - log.log(Level.FINE, () -> "Will download file reference '" + fileReference.value() + "' with timeout " + downloadTimeout); - downloads.put(fileReference, fileReferenceDownload); - downloadStatus.put(fileReference, 0.0); - downloadExecutor.submit(() -> startDownload(fileReferenceDownload)); - return fileReferenceDownload.future(); - } + log.log(Level.FINE, () -> "Will download file reference '" + fileReference.value() + "' with timeout " + downloadTimeout); + downloads.add(fileReferenceDownload); + downloadStatuses.add(fileReference); + downloadExecutor.submit(() -> startDownload(fileReferenceDownload)); + return fileReferenceDownload.future(); } void completedDownloading(FileReference fileReference, File file) { - synchronized (downloads) { - FileReferenceDownload download = downloads.get(fileReference); - if (download != null) { - downloadStatus.put(fileReference, 1.0); - downloads.remove(fileReference); - download.future().complete(Optional.of(file)); - } else { - log.log(Level.FINE, () -> "Received '" + fileReference + "', which was not requested. Can be ignored if happening during upgrades/restarts"); - } + Optional<FileReferenceDownload> download = downloads.get(fileReference); + if (download.isPresent()) { + downloadStatuses.get(fileReference).ifPresent(DownloadStatus::finished); + downloads.remove(fileReference); + download.get().future().complete(Optional.of(file)); + } else { + log.log(Level.FINE, () -> "Received '" + fileReference + "', which was not requested. Can be ignored if happening during upgrades/restarts"); } } void failedDownloading(FileReference fileReference) { - synchronized (downloads) { - downloadStatus.put(fileReference, 0.0); - downloads.remove(fileReference); - } + downloadStatuses.get(fileReference).ifPresent(d -> d.setProgress(0.0)); + downloads.remove(fileReference); } private boolean startDownloadRpc(FileReferenceDownload fileReferenceDownload, int retryCount) { @@ -143,9 +136,7 @@ public class FileReferenceDownloader { } boolean isDownloading(FileReference fileReference) { - synchronized (downloads) { - return downloads.containsKey(fileReference); - } + return downloads.get(fileReference).isPresent(); } private boolean validateResponse(Request request) { @@ -162,25 +153,23 @@ public class FileReferenceDownloader { double downloadStatus(String file) { double status = 0.0; - synchronized (downloads) { - Double download = downloadStatus.get(new FileReference(file)); - if (download != null) { - status = download; - } + Optional<DownloadStatus> downloadStatus = downloadStatuses.get(new FileReference(file)); + if (downloadStatus.isPresent()) { + status = downloadStatus.get().progress(); } return status; } void setDownloadStatus(FileReference fileReference, double completeness) { - synchronized (downloads) { - downloadStatus.put(fileReference, completeness); - } + Optional<DownloadStatus> downloadStatus = downloadStatuses.get(fileReference); + if (downloadStatus.isPresent()) + downloadStatus.get().setProgress(completeness); + else + downloadStatuses.add(fileReference, completeness); } Map<FileReference, Double> downloadStatus() { - synchronized (downloads) { - return Map.copyOf(downloadStatus); - } + return downloadStatuses.all().values().stream().collect(Collectors.toMap(DownloadStatus::fileReference, DownloadStatus::progress)); } public ConnectionPool connectionPool() { @@ -194,4 +183,85 @@ public class FileReferenceDownloader { Thread.interrupted(); // Ignore and continue shutdown. } } + + private static class Downloads { + private final Map<FileReference, FileReferenceDownload> downloads = new ConcurrentHashMap<>(); + + void add(FileReferenceDownload fileReferenceDownload) { + downloads.put(fileReferenceDownload.fileReference(), fileReferenceDownload); + } + + void remove(FileReference fileReference) { + downloads.remove(fileReference); + } + + Optional<FileReferenceDownload> get(FileReference fileReference) { + return Optional.ofNullable(downloads.get(fileReference)); + } + } + + private static class DownloadStatus { + private final FileReference fileReference; + private double progress; // between 0 and 1 + private final Instant created; + + DownloadStatus(FileReference fileReference) { + this.fileReference = fileReference; + this.progress = 0.0; + this.created = Instant.now(); + } + + public FileReference fileReference() { + return fileReference; + } + + public double progress() { + return progress; + } + + public void setProgress(double progress) { + this.progress = progress; + } + + public void finished() { + setProgress(1.0); + } + + public Instant created() { + return created; + } + } + + /* Status for ongoing and completed downloads, keeps at most status for 100 last downloads */ + private static class DownloadStatuses { + + private static final int maxEntries = 100; + + private final Map<FileReference, DownloadStatus> downloadStatus = new ConcurrentHashMap<>(); + + void add(FileReference fileReference) { + add(fileReference, 0.0); + } + + void add(FileReference fileReference, double progress) { + DownloadStatus ds = new DownloadStatus(fileReference); + ds.setProgress(progress); + downloadStatus.put(fileReference, ds); + if (downloadStatus.size() > maxEntries) { + Map.Entry<FileReference, DownloadStatus> oldest = + Collections.min(downloadStatus.entrySet(), Comparator.comparing(e -> e.getValue().created)); + downloadStatus.remove(oldest.getKey()); + } + } + + Optional<DownloadStatus> get(FileReference fileReference) { + return Optional.ofNullable(downloadStatus.get(fileReference)); + } + + Map<FileReference, DownloadStatus> all() { + return Map.copyOf(downloadStatus); + } + + } + } |