aboutsummaryrefslogtreecommitdiffstats
path: root/filedistribution/src/main/java/com/yahoo/vespa
diff options
context:
space:
mode:
authorHarald Musum <musum@verizonmedia.com>2021-06-14 19:41:50 +0200
committerHarald Musum <musum@verizonmedia.com>2021-06-14 19:41:50 +0200
commit7cb76ca350980df5117aed5e673c71aaffb69e6a (patch)
treec4f7a05ef0a303e20f53088042ce55bcf2771d5e /filedistribution/src/main/java/com/yahoo/vespa
parent4e4fc3e5160efe8549e16f50ab12e9266843e538 (diff)
Move more methods into Downloads
Diffstat (limited to 'filedistribution/src/main/java/com/yahoo/vespa')
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/Downloads.java45
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java26
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java12
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java51
4 files changed, 72 insertions, 62 deletions
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/Downloads.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/Downloads.java
index 3942b11ad03..eb0976edc40 100644
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/Downloads.java
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/Downloads.java
@@ -3,30 +3,73 @@ package com.yahoo.vespa.filedistribution;
import com.yahoo.config.FileReference;
+import java.io.File;
import java.time.Instant;
import java.util.Collections;
import java.util.Comparator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
/**
* Keeps track of downloads and download status
*
* @author hmusum
*/
-class Downloads {
+public class Downloads {
+
+ private static final Logger log = Logger.getLogger(Downloads.class.getName());
private final Map<FileReference, FileReferenceDownload> downloads = new ConcurrentHashMap<>();
+ private final DownloadStatuses downloadStatuses = new DownloadStatuses();
+
+ public DownloadStatuses downloadStatuses() { return downloadStatuses; }
+
+ void setDownloadStatus(FileReference fileReference, double completeness) {
+ Optional<Downloads.DownloadStatus> downloadStatus = downloadStatuses.get(fileReference);
+ if (downloadStatus.isPresent())
+ downloadStatus.get().setProgress(completeness);
+ else
+ downloadStatuses.add(fileReference, completeness);
+ }
+
+ void completedDownloading(FileReference fileReference, File file) {
+ Optional<FileReferenceDownload> download = get(fileReference);
+ if (download.isPresent()) {
+ downloadStatuses().get(fileReference).ifPresent(Downloads.DownloadStatus::finished);
+ downloads.remove(fileReference);
+ download.get().future().complete(Optional.of(file));
+ } else {
+ log.log(Level.FINE, () -> "Received '" + fileReference + "', which was not requested. Can be ignored if happening during upgrades/restarts");
+ }
+ }
void add(FileReferenceDownload fileReferenceDownload) {
downloads.put(fileReferenceDownload.fileReference(), fileReferenceDownload);
+ downloadStatuses.add(fileReferenceDownload.fileReference());
}
void remove(FileReference fileReference) {
+ downloadStatuses.get(fileReference).ifPresent(d -> d.setProgress(0.0));
downloads.remove(fileReference);
}
+ double downloadStatus(FileReference fileReference) {
+ double status = 0.0;
+ Optional<Downloads.DownloadStatus> downloadStatus = downloadStatuses.get(fileReference);
+ if (downloadStatus.isPresent()) {
+ status = downloadStatus.get().progress();
+ }
+ return status;
+ }
+
+ Map<FileReference, Double> downloadStatus() {
+ return downloadStatuses.all().values().stream().collect(Collectors.toMap(Downloads.DownloadStatus::fileReference, Downloads.DownloadStatus::progress));
+ }
+
Optional<FileReferenceDownload> get(FileReference fileReference) {
return Optional.ofNullable(downloads.get(fileReference));
}
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java
index b1e43e4cee1..a0ea0d51ccb 100644
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java
@@ -33,20 +33,24 @@ public class FileDownloader implements AutoCloseable {
private final File downloadDirectory;
private final Duration timeout;
private final FileReferenceDownloader fileReferenceDownloader;
+ private final Downloads downloads;
public FileDownloader(ConnectionPool connectionPool) {
- this(connectionPool, defaultDownloadDirectory );
+ this(connectionPool, defaultDownloadDirectory, new Downloads());
}
- public FileDownloader(ConnectionPool connectionPool, File downloadDirectory) {
+ public FileDownloader(ConnectionPool connectionPool, File downloadDirectory, Downloads downloads) {
// TODO: Reduce timeout even more, timeout is so long that we might get starvation
- this(connectionPool, downloadDirectory, downloadDirectory, Duration.ofMinutes(5), Duration.ofSeconds(10));
+ this(connectionPool, downloadDirectory, downloadDirectory, downloads, Duration.ofMinutes(5), Duration.ofSeconds(10));
}
- public FileDownloader(ConnectionPool connectionPool, File downloadDirectory, File tmpDirectory, Duration timeout, Duration sleepBetweenRetries) {
+ public FileDownloader(ConnectionPool connectionPool, File downloadDirectory, File tmpDirectory, Downloads downloads,
+ Duration timeout, Duration sleepBetweenRetries) {
this.downloadDirectory = downloadDirectory;
this.timeout = timeout;
- this.fileReferenceDownloader = new FileReferenceDownloader(downloadDirectory, tmpDirectory, connectionPool, timeout, sleepBetweenRetries);
+ this.fileReferenceDownloader = new FileReferenceDownloader(downloadDirectory, tmpDirectory, connectionPool,
+ downloads, timeout, sleepBetweenRetries);
+ this.downloads = downloads;
}
public Optional<File> getFile(FileReference fileReference) {
@@ -74,12 +78,8 @@ public class FileDownloader implements AutoCloseable {
: download(fileReferenceDownload);
}
- double downloadStatus(FileReference fileReference) {
- return fileReferenceDownloader.downloadStatus(fileReference.value());
- }
-
public Map<FileReference, Double> downloadStatus() {
- return fileReferenceDownloader.downloadStatus();
+ return downloads.downloadStatus();
}
File downloadDirectory() {
@@ -94,10 +94,10 @@ public class FileDownloader implements AutoCloseable {
if (!file.exists()) {
throw new RuntimeException("File reference '" + fileReference.value() + "' does not exist");
} else if (!file.canRead()) {
- throw new RuntimeException("File reference '" + fileReference.value() + "'exists, but unable to read it");
+ throw new RuntimeException("File reference '" + fileReference.value() + "' exists, but unable to read it");
} else {
log.log(Level.FINE, () -> "File reference '" + fileReference.value() + "' found: " + file.getAbsolutePath());
- fileReferenceDownloader.setDownloadStatus(fileReference, 1.0);
+ downloads.setDownloadStatus(fileReference, 1.0);
return Optional.of(file);
}
}
@@ -106,7 +106,7 @@ public class FileDownloader implements AutoCloseable {
private boolean alreadyDownloaded(FileReference fileReference) {
try {
- return (getFileFromFileSystem(fileReference).isPresent());
+ return getFileFromFileSystem(fileReference).isPresent();
} catch (RuntimeException e) {
return false;
}
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java
index 1bc44e0bed2..875ca3cac29 100644
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java
@@ -8,7 +8,6 @@ import com.yahoo.jrt.Int32Value;
import com.yahoo.jrt.Method;
import com.yahoo.jrt.Request;
import com.yahoo.jrt.Supervisor;
-import java.util.logging.Level;
import net.jpountz.xxhash.StreamingXXHash64;
import net.jpountz.xxhash.XXHashFactory;
@@ -22,6 +21,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
import java.util.logging.Logger;
/**
@@ -39,7 +39,7 @@ public class FileReceiver {
public final static String RECEIVE_EOF_METHOD = "filedistribution.receiveFileEof";
private final Supervisor supervisor;
- private final FileReferenceDownloader downloader;
+ private final Downloads downloads;
private final File downloadDirectory;
// Should be on same partition as downloadDirectory to make sure moving files from tmpDirectory
// to downloadDirectory is atomic
@@ -149,9 +149,9 @@ public class FileReceiver {
}
}
- FileReceiver(Supervisor supervisor, FileReferenceDownloader downloader, File downloadDirectory, File tmpDirectory) {
+ FileReceiver(Supervisor supervisor, Downloads downloads, File downloadDirectory, File tmpDirectory) {
this.supervisor = supervisor;
- this.downloader = downloader;
+ this.downloads = downloads;
this.downloadDirectory = downloadDirectory;
this.tmpDirectory = tmpDirectory;
registerMethods();
@@ -260,7 +260,7 @@ public class FileReceiver {
}
double completeness = (double) session.currentFileSize / (double) session.fileSize;
log.log(Level.FINEST, () -> String.format("%.1f percent of '%s' downloaded", completeness * 100, reference.value()));
- downloader.setDownloadStatus(reference, completeness);
+ downloads.setDownloadStatus(reference, completeness);
}
req.returnValues().add(new Int32Value(retval));
}
@@ -273,7 +273,7 @@ public class FileReceiver {
Session session = getSession(sessionId);
int retval = verifySession(session, sessionId, reference);
File file = session.close(xxhash);
- downloader.completedDownloading(reference, file);
+ downloads.completedDownloading(reference, file);
synchronized (sessions) {
sessions.remove(sessionId);
}
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java
index cb9edb2cf20..91767c2dc66 100644
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java
@@ -12,7 +12,6 @@ import com.yahoo.vespa.config.ConnectionPool;
import java.io.File;
import java.time.Duration;
import java.time.Instant;
-import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -20,7 +19,6 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
-import java.util.stream.Collectors;
/**
* Downloads file reference using rpc requests to config server and keeps track of files being downloaded
@@ -35,20 +33,23 @@ public class FileReferenceDownloader {
Executors.newFixedThreadPool(Math.max(8, Runtime.getRuntime().availableProcessors()),
new DaemonThreadFactory("filereference downloader"));
private final ConnectionPool connectionPool;
- /* Ongoing downloads */
- private final Downloads downloads = new Downloads();
- /* Status for ongoing and finished downloads */
- private final Downloads.DownloadStatuses downloadStatuses = new Downloads.DownloadStatuses();
+ private final Downloads downloads;
private final Duration downloadTimeout;
private final Duration sleepBetweenRetries;
private final Duration rpcTimeout;
- FileReferenceDownloader(File downloadDirectory, File tmpDirectory, ConnectionPool connectionPool, Duration timeout, Duration sleepBetweenRetries) {
+ FileReferenceDownloader(File downloadDirectory,
+ File tmpDirectory,
+ ConnectionPool connectionPool,
+ Downloads downloads,
+ Duration timeout,
+ Duration sleepBetweenRetries) {
this.connectionPool = connectionPool;
+ this.downloads = downloads;
this.downloadTimeout = timeout;
this.sleepBetweenRetries = sleepBetweenRetries;
// Needed to receive RPC calls receiveFile* from server after asking for files
- new FileReceiver(connectionPool.getSupervisor(), this, downloadDirectory, tmpDirectory);
+ new FileReceiver(connectionPool.getSupervisor(), downloads, downloadDirectory, tmpDirectory);
String timeoutString = System.getenv("VESPA_CONFIGPROXY_FILEDOWNLOAD_RPC_TIMEOUT");
this.rpcTimeout = Duration.ofSeconds(timeoutString == null ? 30 : Integer.parseInt(timeoutString));
}
@@ -85,24 +86,11 @@ public class FileReferenceDownloader {
log.log(Level.FINE, () -> "Will download file reference '" + fileReference.value() + "' with timeout " + downloadTimeout);
downloads.add(fileReferenceDownload);
- downloadStatuses.add(fileReference);
downloadExecutor.submit(() -> startDownload(fileReferenceDownload));
return fileReferenceDownload.future();
}
- void completedDownloading(FileReference fileReference, File file) {
- Optional<FileReferenceDownload> download = downloads.get(fileReference);
- if (download.isPresent()) {
- downloadStatuses.get(fileReference).ifPresent(Downloads.DownloadStatus::finished);
- downloads.remove(fileReference);
- download.get().future().complete(Optional.of(file));
- } else {
- log.log(Level.FINE, () -> "Received '" + fileReference + "', which was not requested. Can be ignored if happening during upgrades/restarts");
- }
- }
-
void failedDownloading(FileReference fileReference) {
- downloadStatuses.get(fileReference).ifPresent(d -> d.setProgress(0.0));
downloads.remove(fileReference);
}
@@ -150,27 +138,6 @@ public class FileReferenceDownloader {
return true;
}
- double downloadStatus(String file) {
- double status = 0.0;
- Optional<Downloads.DownloadStatus> downloadStatus = downloadStatuses.get(new FileReference(file));
- if (downloadStatus.isPresent()) {
- status = downloadStatus.get().progress();
- }
- return status;
- }
-
- void setDownloadStatus(FileReference fileReference, double completeness) {
- Optional<Downloads.DownloadStatus> downloadStatus = downloadStatuses.get(fileReference);
- if (downloadStatus.isPresent())
- downloadStatus.get().setProgress(completeness);
- else
- downloadStatuses.add(fileReference, completeness);
- }
-
- Map<FileReference, Double> downloadStatus() {
- return downloadStatuses.all().values().stream().collect(Collectors.toMap(Downloads.DownloadStatus::fileReference, Downloads.DownloadStatus::progress));
- }
-
public ConnectionPool connectionPool() {
return connectionPool;
}