diff options
author | Harald Musum <musum@verizonmedia.com> | 2021-06-14 19:41:50 +0200 |
---|---|---|
committer | Harald Musum <musum@verizonmedia.com> | 2021-06-14 19:41:50 +0200 |
commit | 7cb76ca350980df5117aed5e673c71aaffb69e6a (patch) | |
tree | c4f7a05ef0a303e20f53088042ce55bcf2771d5e /filedistribution/src/main/java/com/yahoo/vespa | |
parent | 4e4fc3e5160efe8549e16f50ab12e9266843e538 (diff) |
Move more methods into Downloads
Diffstat (limited to 'filedistribution/src/main/java/com/yahoo/vespa')
4 files changed, 72 insertions, 62 deletions
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/Downloads.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/Downloads.java index 3942b11ad03..eb0976edc40 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/Downloads.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/Downloads.java @@ -3,30 +3,73 @@ package com.yahoo.vespa.filedistribution; import com.yahoo.config.FileReference; +import java.io.File; import java.time.Instant; import java.util.Collections; import java.util.Comparator; import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; +import java.util.logging.Level; +import java.util.logging.Logger; +import java.util.stream.Collectors; /** * Keeps track of downloads and download status * * @author hmusum */ -class Downloads { +public class Downloads { + + private static final Logger log = Logger.getLogger(Downloads.class.getName()); private final Map<FileReference, FileReferenceDownload> downloads = new ConcurrentHashMap<>(); + private final DownloadStatuses downloadStatuses = new DownloadStatuses(); + + public DownloadStatuses downloadStatuses() { return downloadStatuses; } + + void setDownloadStatus(FileReference fileReference, double completeness) { + Optional<Downloads.DownloadStatus> downloadStatus = downloadStatuses.get(fileReference); + if (downloadStatus.isPresent()) + downloadStatus.get().setProgress(completeness); + else + downloadStatuses.add(fileReference, completeness); + } + + void completedDownloading(FileReference fileReference, File file) { + Optional<FileReferenceDownload> download = get(fileReference); + if (download.isPresent()) { + downloadStatuses().get(fileReference).ifPresent(Downloads.DownloadStatus::finished); + downloads.remove(fileReference); + download.get().future().complete(Optional.of(file)); + } else { + log.log(Level.FINE, () -> "Received '" + fileReference + "', which was not requested. Can be ignored if happening during upgrades/restarts"); + } + } void add(FileReferenceDownload fileReferenceDownload) { downloads.put(fileReferenceDownload.fileReference(), fileReferenceDownload); + downloadStatuses.add(fileReferenceDownload.fileReference()); } void remove(FileReference fileReference) { + downloadStatuses.get(fileReference).ifPresent(d -> d.setProgress(0.0)); downloads.remove(fileReference); } + double downloadStatus(FileReference fileReference) { + double status = 0.0; + Optional<Downloads.DownloadStatus> downloadStatus = downloadStatuses.get(fileReference); + if (downloadStatus.isPresent()) { + status = downloadStatus.get().progress(); + } + return status; + } + + Map<FileReference, Double> downloadStatus() { + return downloadStatuses.all().values().stream().collect(Collectors.toMap(Downloads.DownloadStatus::fileReference, Downloads.DownloadStatus::progress)); + } + Optional<FileReferenceDownload> get(FileReference fileReference) { return Optional.ofNullable(downloads.get(fileReference)); } diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java index b1e43e4cee1..a0ea0d51ccb 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java @@ -33,20 +33,24 @@ public class FileDownloader implements AutoCloseable { private final File downloadDirectory; private final Duration timeout; private final FileReferenceDownloader fileReferenceDownloader; + private final Downloads downloads; public FileDownloader(ConnectionPool connectionPool) { - this(connectionPool, defaultDownloadDirectory ); + this(connectionPool, defaultDownloadDirectory, new Downloads()); } - public FileDownloader(ConnectionPool connectionPool, File downloadDirectory) { + public FileDownloader(ConnectionPool connectionPool, File downloadDirectory, Downloads downloads) { // TODO: Reduce timeout even more, timeout is so long that we might get starvation - this(connectionPool, downloadDirectory, downloadDirectory, Duration.ofMinutes(5), Duration.ofSeconds(10)); + this(connectionPool, downloadDirectory, downloadDirectory, downloads, Duration.ofMinutes(5), Duration.ofSeconds(10)); } - public FileDownloader(ConnectionPool connectionPool, File downloadDirectory, File tmpDirectory, Duration timeout, Duration sleepBetweenRetries) { + public FileDownloader(ConnectionPool connectionPool, File downloadDirectory, File tmpDirectory, Downloads downloads, + Duration timeout, Duration sleepBetweenRetries) { this.downloadDirectory = downloadDirectory; this.timeout = timeout; - this.fileReferenceDownloader = new FileReferenceDownloader(downloadDirectory, tmpDirectory, connectionPool, timeout, sleepBetweenRetries); + this.fileReferenceDownloader = new FileReferenceDownloader(downloadDirectory, tmpDirectory, connectionPool, + downloads, timeout, sleepBetweenRetries); + this.downloads = downloads; } public Optional<File> getFile(FileReference fileReference) { @@ -74,12 +78,8 @@ public class FileDownloader implements AutoCloseable { : download(fileReferenceDownload); } - double downloadStatus(FileReference fileReference) { - return fileReferenceDownloader.downloadStatus(fileReference.value()); - } - public Map<FileReference, Double> downloadStatus() { - return fileReferenceDownloader.downloadStatus(); + return downloads.downloadStatus(); } File downloadDirectory() { @@ -94,10 +94,10 @@ public class FileDownloader implements AutoCloseable { if (!file.exists()) { throw new RuntimeException("File reference '" + fileReference.value() + "' does not exist"); } else if (!file.canRead()) { - throw new RuntimeException("File reference '" + fileReference.value() + "'exists, but unable to read it"); + throw new RuntimeException("File reference '" + fileReference.value() + "' exists, but unable to read it"); } else { log.log(Level.FINE, () -> "File reference '" + fileReference.value() + "' found: " + file.getAbsolutePath()); - fileReferenceDownloader.setDownloadStatus(fileReference, 1.0); + downloads.setDownloadStatus(fileReference, 1.0); return Optional.of(file); } } @@ -106,7 +106,7 @@ public class FileDownloader implements AutoCloseable { private boolean alreadyDownloaded(FileReference fileReference) { try { - return (getFileFromFileSystem(fileReference).isPresent()); + return getFileFromFileSystem(fileReference).isPresent(); } catch (RuntimeException e) { return false; } diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java index 1bc44e0bed2..875ca3cac29 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java @@ -8,7 +8,6 @@ import com.yahoo.jrt.Int32Value; import com.yahoo.jrt.Method; import com.yahoo.jrt.Request; import com.yahoo.jrt.Supervisor; -import java.util.logging.Level; import net.jpountz.xxhash.StreamingXXHash64; import net.jpountz.xxhash.XXHashFactory; @@ -22,6 +21,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; import java.util.logging.Logger; /** @@ -39,7 +39,7 @@ public class FileReceiver { public final static String RECEIVE_EOF_METHOD = "filedistribution.receiveFileEof"; private final Supervisor supervisor; - private final FileReferenceDownloader downloader; + private final Downloads downloads; private final File downloadDirectory; // Should be on same partition as downloadDirectory to make sure moving files from tmpDirectory // to downloadDirectory is atomic @@ -149,9 +149,9 @@ public class FileReceiver { } } - FileReceiver(Supervisor supervisor, FileReferenceDownloader downloader, File downloadDirectory, File tmpDirectory) { + FileReceiver(Supervisor supervisor, Downloads downloads, File downloadDirectory, File tmpDirectory) { this.supervisor = supervisor; - this.downloader = downloader; + this.downloads = downloads; this.downloadDirectory = downloadDirectory; this.tmpDirectory = tmpDirectory; registerMethods(); @@ -260,7 +260,7 @@ public class FileReceiver { } double completeness = (double) session.currentFileSize / (double) session.fileSize; log.log(Level.FINEST, () -> String.format("%.1f percent of '%s' downloaded", completeness * 100, reference.value())); - downloader.setDownloadStatus(reference, completeness); + downloads.setDownloadStatus(reference, completeness); } req.returnValues().add(new Int32Value(retval)); } @@ -273,7 +273,7 @@ public class FileReceiver { Session session = getSession(sessionId); int retval = verifySession(session, sessionId, reference); File file = session.close(xxhash); - downloader.completedDownloading(reference, file); + downloads.completedDownloading(reference, file); synchronized (sessions) { sessions.remove(sessionId); } diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java index cb9edb2cf20..91767c2dc66 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java @@ -12,7 +12,6 @@ import com.yahoo.vespa.config.ConnectionPool; import java.io.File; import java.time.Duration; import java.time.Instant; -import java.util.Map; import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -20,7 +19,6 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; -import java.util.stream.Collectors; /** * Downloads file reference using rpc requests to config server and keeps track of files being downloaded @@ -35,20 +33,23 @@ public class FileReferenceDownloader { Executors.newFixedThreadPool(Math.max(8, Runtime.getRuntime().availableProcessors()), new DaemonThreadFactory("filereference downloader")); private final ConnectionPool connectionPool; - /* Ongoing downloads */ - private final Downloads downloads = new Downloads(); - /* Status for ongoing and finished downloads */ - private final Downloads.DownloadStatuses downloadStatuses = new Downloads.DownloadStatuses(); + private final Downloads downloads; private final Duration downloadTimeout; private final Duration sleepBetweenRetries; private final Duration rpcTimeout; - FileReferenceDownloader(File downloadDirectory, File tmpDirectory, ConnectionPool connectionPool, Duration timeout, Duration sleepBetweenRetries) { + FileReferenceDownloader(File downloadDirectory, + File tmpDirectory, + ConnectionPool connectionPool, + Downloads downloads, + Duration timeout, + Duration sleepBetweenRetries) { this.connectionPool = connectionPool; + this.downloads = downloads; this.downloadTimeout = timeout; this.sleepBetweenRetries = sleepBetweenRetries; // Needed to receive RPC calls receiveFile* from server after asking for files - new FileReceiver(connectionPool.getSupervisor(), this, downloadDirectory, tmpDirectory); + new FileReceiver(connectionPool.getSupervisor(), downloads, downloadDirectory, tmpDirectory); String timeoutString = System.getenv("VESPA_CONFIGPROXY_FILEDOWNLOAD_RPC_TIMEOUT"); this.rpcTimeout = Duration.ofSeconds(timeoutString == null ? 30 : Integer.parseInt(timeoutString)); } @@ -85,24 +86,11 @@ public class FileReferenceDownloader { log.log(Level.FINE, () -> "Will download file reference '" + fileReference.value() + "' with timeout " + downloadTimeout); downloads.add(fileReferenceDownload); - downloadStatuses.add(fileReference); downloadExecutor.submit(() -> startDownload(fileReferenceDownload)); return fileReferenceDownload.future(); } - void completedDownloading(FileReference fileReference, File file) { - Optional<FileReferenceDownload> download = downloads.get(fileReference); - if (download.isPresent()) { - downloadStatuses.get(fileReference).ifPresent(Downloads.DownloadStatus::finished); - downloads.remove(fileReference); - download.get().future().complete(Optional.of(file)); - } else { - log.log(Level.FINE, () -> "Received '" + fileReference + "', which was not requested. Can be ignored if happening during upgrades/restarts"); - } - } - void failedDownloading(FileReference fileReference) { - downloadStatuses.get(fileReference).ifPresent(d -> d.setProgress(0.0)); downloads.remove(fileReference); } @@ -150,27 +138,6 @@ public class FileReferenceDownloader { return true; } - double downloadStatus(String file) { - double status = 0.0; - Optional<Downloads.DownloadStatus> downloadStatus = downloadStatuses.get(new FileReference(file)); - if (downloadStatus.isPresent()) { - status = downloadStatus.get().progress(); - } - return status; - } - - void setDownloadStatus(FileReference fileReference, double completeness) { - Optional<Downloads.DownloadStatus> downloadStatus = downloadStatuses.get(fileReference); - if (downloadStatus.isPresent()) - downloadStatus.get().setProgress(completeness); - else - downloadStatuses.add(fileReference, completeness); - } - - Map<FileReference, Double> downloadStatus() { - return downloadStatuses.all().values().stream().collect(Collectors.toMap(Downloads.DownloadStatus::fileReference, Downloads.DownloadStatus::progress)); - } - public ConnectionPool connectionPool() { return connectionPool; } |