summaryrefslogtreecommitdiffstats
path: root/filedistribution
diff options
context:
space:
mode:
authorHarald Musum <musum@verizonmedia.com>2021-06-14 19:41:50 +0200
committerHarald Musum <musum@verizonmedia.com>2021-06-14 19:41:50 +0200
commit7cb76ca350980df5117aed5e673c71aaffb69e6a (patch)
treec4f7a05ef0a303e20f53088042ce55bcf2771d5e /filedistribution
parent4e4fc3e5160efe8549e16f50ab12e9266843e538 (diff)
Move more methods into Downloads
Diffstat (limited to 'filedistribution')
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/Downloads.java45
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java26
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java12
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java51
-rw-r--r--filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java36
5 files changed, 91 insertions, 79 deletions
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/Downloads.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/Downloads.java
index 3942b11ad03..eb0976edc40 100644
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/Downloads.java
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/Downloads.java
@@ -3,30 +3,73 @@ package com.yahoo.vespa.filedistribution;
import com.yahoo.config.FileReference;
+import java.io.File;
import java.time.Instant;
import java.util.Collections;
import java.util.Comparator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
/**
* Keeps track of downloads and download status
*
* @author hmusum
*/
-class Downloads {
+public class Downloads {
+
+ private static final Logger log = Logger.getLogger(Downloads.class.getName());
private final Map<FileReference, FileReferenceDownload> downloads = new ConcurrentHashMap<>();
+ private final DownloadStatuses downloadStatuses = new DownloadStatuses();
+
+ public DownloadStatuses downloadStatuses() { return downloadStatuses; }
+
+ void setDownloadStatus(FileReference fileReference, double completeness) {
+ Optional<Downloads.DownloadStatus> downloadStatus = downloadStatuses.get(fileReference);
+ if (downloadStatus.isPresent())
+ downloadStatus.get().setProgress(completeness);
+ else
+ downloadStatuses.add(fileReference, completeness);
+ }
+
+ void completedDownloading(FileReference fileReference, File file) {
+ Optional<FileReferenceDownload> download = get(fileReference);
+ if (download.isPresent()) {
+ downloadStatuses().get(fileReference).ifPresent(Downloads.DownloadStatus::finished);
+ downloads.remove(fileReference);
+ download.get().future().complete(Optional.of(file));
+ } else {
+ log.log(Level.FINE, () -> "Received '" + fileReference + "', which was not requested. Can be ignored if happening during upgrades/restarts");
+ }
+ }
void add(FileReferenceDownload fileReferenceDownload) {
downloads.put(fileReferenceDownload.fileReference(), fileReferenceDownload);
+ downloadStatuses.add(fileReferenceDownload.fileReference());
}
void remove(FileReference fileReference) {
+ downloadStatuses.get(fileReference).ifPresent(d -> d.setProgress(0.0));
downloads.remove(fileReference);
}
+ double downloadStatus(FileReference fileReference) {
+ double status = 0.0;
+ Optional<Downloads.DownloadStatus> downloadStatus = downloadStatuses.get(fileReference);
+ if (downloadStatus.isPresent()) {
+ status = downloadStatus.get().progress();
+ }
+ return status;
+ }
+
+ Map<FileReference, Double> downloadStatus() {
+ return downloadStatuses.all().values().stream().collect(Collectors.toMap(Downloads.DownloadStatus::fileReference, Downloads.DownloadStatus::progress));
+ }
+
Optional<FileReferenceDownload> get(FileReference fileReference) {
return Optional.ofNullable(downloads.get(fileReference));
}
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java
index b1e43e4cee1..a0ea0d51ccb 100644
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java
@@ -33,20 +33,24 @@ public class FileDownloader implements AutoCloseable {
private final File downloadDirectory;
private final Duration timeout;
private final FileReferenceDownloader fileReferenceDownloader;
+ private final Downloads downloads;
public FileDownloader(ConnectionPool connectionPool) {
- this(connectionPool, defaultDownloadDirectory );
+ this(connectionPool, defaultDownloadDirectory, new Downloads());
}
- public FileDownloader(ConnectionPool connectionPool, File downloadDirectory) {
+ public FileDownloader(ConnectionPool connectionPool, File downloadDirectory, Downloads downloads) {
// TODO: Reduce timeout even more, timeout is so long that we might get starvation
- this(connectionPool, downloadDirectory, downloadDirectory, Duration.ofMinutes(5), Duration.ofSeconds(10));
+ this(connectionPool, downloadDirectory, downloadDirectory, downloads, Duration.ofMinutes(5), Duration.ofSeconds(10));
}
- public FileDownloader(ConnectionPool connectionPool, File downloadDirectory, File tmpDirectory, Duration timeout, Duration sleepBetweenRetries) {
+ public FileDownloader(ConnectionPool connectionPool, File downloadDirectory, File tmpDirectory, Downloads downloads,
+ Duration timeout, Duration sleepBetweenRetries) {
this.downloadDirectory = downloadDirectory;
this.timeout = timeout;
- this.fileReferenceDownloader = new FileReferenceDownloader(downloadDirectory, tmpDirectory, connectionPool, timeout, sleepBetweenRetries);
+ this.fileReferenceDownloader = new FileReferenceDownloader(downloadDirectory, tmpDirectory, connectionPool,
+ downloads, timeout, sleepBetweenRetries);
+ this.downloads = downloads;
}
public Optional<File> getFile(FileReference fileReference) {
@@ -74,12 +78,8 @@ public class FileDownloader implements AutoCloseable {
: download(fileReferenceDownload);
}
- double downloadStatus(FileReference fileReference) {
- return fileReferenceDownloader.downloadStatus(fileReference.value());
- }
-
public Map<FileReference, Double> downloadStatus() {
- return fileReferenceDownloader.downloadStatus();
+ return downloads.downloadStatus();
}
File downloadDirectory() {
@@ -94,10 +94,10 @@ public class FileDownloader implements AutoCloseable {
if (!file.exists()) {
throw new RuntimeException("File reference '" + fileReference.value() + "' does not exist");
} else if (!file.canRead()) {
- throw new RuntimeException("File reference '" + fileReference.value() + "'exists, but unable to read it");
+ throw new RuntimeException("File reference '" + fileReference.value() + "' exists, but unable to read it");
} else {
log.log(Level.FINE, () -> "File reference '" + fileReference.value() + "' found: " + file.getAbsolutePath());
- fileReferenceDownloader.setDownloadStatus(fileReference, 1.0);
+ downloads.setDownloadStatus(fileReference, 1.0);
return Optional.of(file);
}
}
@@ -106,7 +106,7 @@ public class FileDownloader implements AutoCloseable {
private boolean alreadyDownloaded(FileReference fileReference) {
try {
- return (getFileFromFileSystem(fileReference).isPresent());
+ return getFileFromFileSystem(fileReference).isPresent();
} catch (RuntimeException e) {
return false;
}
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java
index 1bc44e0bed2..875ca3cac29 100644
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java
@@ -8,7 +8,6 @@ import com.yahoo.jrt.Int32Value;
import com.yahoo.jrt.Method;
import com.yahoo.jrt.Request;
import com.yahoo.jrt.Supervisor;
-import java.util.logging.Level;
import net.jpountz.xxhash.StreamingXXHash64;
import net.jpountz.xxhash.XXHashFactory;
@@ -22,6 +21,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
import java.util.logging.Logger;
/**
@@ -39,7 +39,7 @@ public class FileReceiver {
public final static String RECEIVE_EOF_METHOD = "filedistribution.receiveFileEof";
private final Supervisor supervisor;
- private final FileReferenceDownloader downloader;
+ private final Downloads downloads;
private final File downloadDirectory;
// Should be on same partition as downloadDirectory to make sure moving files from tmpDirectory
// to downloadDirectory is atomic
@@ -149,9 +149,9 @@ public class FileReceiver {
}
}
- FileReceiver(Supervisor supervisor, FileReferenceDownloader downloader, File downloadDirectory, File tmpDirectory) {
+ FileReceiver(Supervisor supervisor, Downloads downloads, File downloadDirectory, File tmpDirectory) {
this.supervisor = supervisor;
- this.downloader = downloader;
+ this.downloads = downloads;
this.downloadDirectory = downloadDirectory;
this.tmpDirectory = tmpDirectory;
registerMethods();
@@ -260,7 +260,7 @@ public class FileReceiver {
}
double completeness = (double) session.currentFileSize / (double) session.fileSize;
log.log(Level.FINEST, () -> String.format("%.1f percent of '%s' downloaded", completeness * 100, reference.value()));
- downloader.setDownloadStatus(reference, completeness);
+ downloads.setDownloadStatus(reference, completeness);
}
req.returnValues().add(new Int32Value(retval));
}
@@ -273,7 +273,7 @@ public class FileReceiver {
Session session = getSession(sessionId);
int retval = verifySession(session, sessionId, reference);
File file = session.close(xxhash);
- downloader.completedDownloading(reference, file);
+ downloads.completedDownloading(reference, file);
synchronized (sessions) {
sessions.remove(sessionId);
}
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java
index cb9edb2cf20..91767c2dc66 100644
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java
@@ -12,7 +12,6 @@ import com.yahoo.vespa.config.ConnectionPool;
import java.io.File;
import java.time.Duration;
import java.time.Instant;
-import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -20,7 +19,6 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
-import java.util.stream.Collectors;
/**
* Downloads file reference using rpc requests to config server and keeps track of files being downloaded
@@ -35,20 +33,23 @@ public class FileReferenceDownloader {
Executors.newFixedThreadPool(Math.max(8, Runtime.getRuntime().availableProcessors()),
new DaemonThreadFactory("filereference downloader"));
private final ConnectionPool connectionPool;
- /* Ongoing downloads */
- private final Downloads downloads = new Downloads();
- /* Status for ongoing and finished downloads */
- private final Downloads.DownloadStatuses downloadStatuses = new Downloads.DownloadStatuses();
+ private final Downloads downloads;
private final Duration downloadTimeout;
private final Duration sleepBetweenRetries;
private final Duration rpcTimeout;
- FileReferenceDownloader(File downloadDirectory, File tmpDirectory, ConnectionPool connectionPool, Duration timeout, Duration sleepBetweenRetries) {
+ FileReferenceDownloader(File downloadDirectory,
+ File tmpDirectory,
+ ConnectionPool connectionPool,
+ Downloads downloads,
+ Duration timeout,
+ Duration sleepBetweenRetries) {
this.connectionPool = connectionPool;
+ this.downloads = downloads;
this.downloadTimeout = timeout;
this.sleepBetweenRetries = sleepBetweenRetries;
// Needed to receive RPC calls receiveFile* from server after asking for files
- new FileReceiver(connectionPool.getSupervisor(), this, downloadDirectory, tmpDirectory);
+ new FileReceiver(connectionPool.getSupervisor(), downloads, downloadDirectory, tmpDirectory);
String timeoutString = System.getenv("VESPA_CONFIGPROXY_FILEDOWNLOAD_RPC_TIMEOUT");
this.rpcTimeout = Duration.ofSeconds(timeoutString == null ? 30 : Integer.parseInt(timeoutString));
}
@@ -85,24 +86,11 @@ public class FileReferenceDownloader {
log.log(Level.FINE, () -> "Will download file reference '" + fileReference.value() + "' with timeout " + downloadTimeout);
downloads.add(fileReferenceDownload);
- downloadStatuses.add(fileReference);
downloadExecutor.submit(() -> startDownload(fileReferenceDownload));
return fileReferenceDownload.future();
}
- void completedDownloading(FileReference fileReference, File file) {
- Optional<FileReferenceDownload> download = downloads.get(fileReference);
- if (download.isPresent()) {
- downloadStatuses.get(fileReference).ifPresent(Downloads.DownloadStatus::finished);
- downloads.remove(fileReference);
- download.get().future().complete(Optional.of(file));
- } else {
- log.log(Level.FINE, () -> "Received '" + fileReference + "', which was not requested. Can be ignored if happening during upgrades/restarts");
- }
- }
-
void failedDownloading(FileReference fileReference) {
- downloadStatuses.get(fileReference).ifPresent(d -> d.setProgress(0.0));
downloads.remove(fileReference);
}
@@ -150,27 +138,6 @@ public class FileReferenceDownloader {
return true;
}
- double downloadStatus(String file) {
- double status = 0.0;
- Optional<Downloads.DownloadStatus> downloadStatus = downloadStatuses.get(new FileReference(file));
- if (downloadStatus.isPresent()) {
- status = downloadStatus.get().progress();
- }
- return status;
- }
-
- void setDownloadStatus(FileReference fileReference, double completeness) {
- Optional<Downloads.DownloadStatus> downloadStatus = downloadStatuses.get(fileReference);
- if (downloadStatus.isPresent())
- downloadStatus.get().setProgress(completeness);
- else
- downloadStatuses.add(fileReference, completeness);
- }
-
- Map<FileReference, Double> downloadStatus() {
- return downloadStatuses.all().values().stream().collect(Collectors.toMap(Downloads.DownloadStatus::fileReference, Downloads.DownloadStatus::progress));
- }
-
public ConnectionPool connectionPool() {
return connectionPool;
}
diff --git a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java
index 61575b650ce..875159be8f9 100644
--- a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java
+++ b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java
@@ -1,5 +1,4 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.filedistribution;
import com.yahoo.config.FileReference;
@@ -42,6 +41,7 @@ public class FileDownloaderTest {
private static final Duration sleepBetweenRetries = Duration.ofMillis(10);
private MockConnection connection;
+ private Downloads downloads;
private FileDownloader fileDownloader;
private File downloadDir;
private File tempDir;
@@ -52,7 +52,9 @@ public class FileDownloaderTest {
downloadDir = Files.createTempDirectory("filedistribution").toFile();
tempDir = Files.createTempDirectory("download").toFile();
connection = new MockConnection();
- fileDownloader = new FileDownloader(connection, downloadDir, tempDir, Duration.ofSeconds(1), sleepBetweenRetries);
+ downloads = new Downloads();
+ fileDownloader = new FileDownloader(connection, downloadDir, tempDir, downloads,
+ Duration.ofSeconds(1), sleepBetweenRetries);
} catch (IOException e) {
e.printStackTrace();
fail(e.getMessage());
@@ -85,7 +87,7 @@ public class FileDownloaderTest {
assertEquals("content", IOUtils.readFile(pathToFile.get()));
// Verify download status when downloaded
- assertDownloadStatus(fileDownloader, fileReference, 1.0);
+ assertDownloadStatus(fileReference, 1.0);
}
{
@@ -98,7 +100,7 @@ public class FileDownloaderTest {
assertFalse(fileReferenceFullPath.getAbsolutePath(), fileDownloader.getFile(fileReference).isPresent());
// Verify download status when unable to download
- assertDownloadStatus(fileDownloader, fileReference, 0.0);
+ assertDownloadStatus(fileReference, 0.0);
}
{
@@ -109,7 +111,7 @@ public class FileDownloaderTest {
assertFalse(fileReferenceFullPath.getAbsolutePath(), fileDownloader.getFile(fileReference).isPresent());
// Verify download status
- assertDownloadStatus(fileDownloader, fileReference, 0.0);
+ assertDownloadStatus(fileReference, 0.0);
// Receives fileReference, should return and make it available to caller
String filename = "abc.jar";
@@ -122,7 +124,7 @@ public class FileDownloaderTest {
assertEquals("some other content", IOUtils.readFile(downloadedFile.get()));
// Verify download status when downloaded
- assertDownloadStatus(fileDownloader, fileReference, 1.0);
+ assertDownloadStatus(fileReference, 1.0);
}
{
@@ -133,7 +135,7 @@ public class FileDownloaderTest {
assertFalse(fileReferenceFullPath.getAbsolutePath(), fileDownloader.getFile(fileReference).isPresent());
// Verify download status
- assertDownloadStatus(fileDownloader, fileReference, 0.0);
+ assertDownloadStatus(fileReference, 0.0);
// Receives fileReference, should return and make it available to caller
String filename = "abc.tar.gz";
@@ -157,13 +159,13 @@ public class FileDownloaderTest {
assertEquals("bar", IOUtils.readFile(downloadedBar));
// Verify download status when downloaded
- assertDownloadStatus(fileDownloader, fileReference, 1.0);
+ assertDownloadStatus(fileReference, 1.0);
}
}
@Test
public void getFileWhenConnectionError() throws IOException {
- fileDownloader = new FileDownloader(connection, downloadDir, tempDir, Duration.ofSeconds(2), sleepBetweenRetries);
+ fileDownloader = new FileDownloader(connection, downloadDir, tempDir, downloads, Duration.ofSeconds(2), sleepBetweenRetries);
File downloadDir = fileDownloader.downloadDirectory();
int timesToFail = 2;
@@ -175,7 +177,7 @@ public class FileDownloaderTest {
assertFalse(fileReferenceFullPath.getAbsolutePath(), fileDownloader.getFile(fileReference).isPresent());
// Getting file failed, verify download status and since there was an error is not downloading ATM
- assertDownloadStatus(fileDownloader, fileReference, 0.0);
+ assertDownloadStatus(fileReference, 0.0);
assertFalse(fileDownloader.fileReferenceDownloader().isDownloading(fileReference));
// Receives fileReference, should return and make it available to caller
@@ -188,7 +190,7 @@ public class FileDownloaderTest {
assertEquals("some other content", IOUtils.readFile(downloadedFile.get()));
// Verify download status when downloaded
- assertDownloadStatus(fileDownloader, fileReference, 1.0);
+ assertDownloadStatus(fileReference, 1.0);
assertEquals(timesToFail, responseHandler.failedTimes);
}
@@ -197,7 +199,7 @@ public class FileDownloaderTest {
public void getFileWhenDownloadInProgress() throws IOException, ExecutionException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(2);
String filename = "abc.jar";
- fileDownloader = new FileDownloader(connection, downloadDir, tempDir, Duration.ofSeconds(3), sleepBetweenRetries);
+ fileDownloader = new FileDownloader(connection, downloadDir, tempDir, downloads, Duration.ofSeconds(3), sleepBetweenRetries);
File downloadDir = fileDownloader.downloadDirectory();
// Delay response so that we can make a second request while downloading the file from the first request
@@ -237,7 +239,7 @@ public class FileDownloaderTest {
Duration timeout = Duration.ofMillis(200);
MockConnection connectionPool = new MockConnection();
connectionPool.setResponseHandler(new MockConnection.WaitResponseHandler(timeout.plus(Duration.ofMillis(1000))));
- FileDownloader fileDownloader = new FileDownloader(connectionPool, downloadDir, tempDir, timeout, sleepBetweenRetries);
+ FileDownloader fileDownloader = new FileDownloader(connectionPool, downloadDir, tempDir, downloads, timeout, sleepBetweenRetries);
FileReference foo = new FileReference("foo");
// Should download since we do not have the file on disk
fileDownloader.downloadIfNeeded(new FileReferenceDownload(foo));
@@ -271,8 +273,8 @@ public class FileDownloaderTest {
return new File(dir, fileReference.value());
}
- private void assertDownloadStatus(FileDownloader fileDownloader, FileReference fileReference, double expectedDownloadStatus) {
- double downloadStatus = fileDownloader.downloadStatus(fileReference);
+ private void assertDownloadStatus(FileReference fileReference, double expectedDownloadStatus) {
+ double downloadStatus = downloads.downloadStatus(fileReference);
assertEquals(expectedDownloadStatus, downloadStatus, 0.0001);
}
@@ -288,7 +290,7 @@ public class FileDownloaderTest {
new FileReceiver.Session(downloadDir, tempDir, 1, fileReference, type, filename, content.length);
session.addPart(0, content);
File file = session.close(hasher.hash(ByteBuffer.wrap(content), 0));
- fileDownloader.fileReferenceDownloader().completedDownloading(fileReference, file);
+ downloads.completedDownloading(fileReference, file);
}
private static class MockConnection implements ConnectionPool, com.yahoo.vespa.config.Connection {