summaryrefslogtreecommitdiffstats
path: root/filedistribution
diff options
context:
space:
mode:
Diffstat (limited to 'filedistribution')
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDistributionRpcServer.java11
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java37
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java20
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java13
-rw-r--r--filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java5
5 files changed, 46 insertions, 40 deletions
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDistributionRpcServer.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDistributionRpcServer.java
index f24d76fe8be..3b15d5c83b5 100644
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDistributionRpcServer.java
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDistributionRpcServer.java
@@ -13,14 +13,13 @@ import com.yahoo.jrt.Supervisor;
import com.yahoo.log.LogLevel;
import java.io.File;
-import java.util.List;
+import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Logger;
import java.util.stream.Collectors;
-import java.util.stream.Stream;
/**
* An RPC server that handles file distribution requests.
@@ -104,12 +103,10 @@ public class FileDistributionRpcServer {
@SuppressWarnings({"UnusedDeclaration"})
public final void setFileReferencesToDownload(Request req) {
- String[] fileReferenceStrings = req.parameters().get(0).asStringArray();
- List<FileReference> fileReferences = Stream.of(fileReferenceStrings)
+ Arrays.stream(req.parameters().get(0).asStringArray())
.map(FileReference::new)
- .collect(Collectors.toList());
- downloader.queueForAsyncDownload(fileReferences);
-
+ .forEach(fileReference -> downloader.queueForAsyncDownload(
+ new FileReferenceDownload(fileReference)));
req.returnValues().add(new Int32Value(0));
}
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java
index 02d6e1ad022..f2a82ac6ead 100644
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java
@@ -10,7 +10,6 @@ import com.yahoo.vespa.defaults.Defaults;
import java.io.File;
import java.time.Duration;
-import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
@@ -48,16 +47,21 @@ public class FileDownloader {
}
public Optional<File> getFile(FileReference fileReference) {
+ return getFile(new FileReferenceDownload(fileReference));
+ }
+
+ public Optional<File> getFile(FileReferenceDownload fileReferenceDownload) {
try {
- return getFutureFile(fileReference).get(timeout.toMillis(), TimeUnit.MILLISECONDS);
+ return getFutureFile(fileReferenceDownload).get(timeout.toMillis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
- log.log(LogLevel.WARNING, "Failed downloading file, removing from download queue: " + e.getMessage());
- fileReferenceDownloader.failedDownloading(fileReference);
+ log.log(LogLevel.WARNING, "Failed downloading '" + fileReferenceDownload.fileReference().value() + "', removing from download queue: " + e.getMessage());
+ fileReferenceDownloader.failedDownloading(fileReferenceDownload.fileReference());
return Optional.empty();
}
}
- private Future<Optional<File>> getFutureFile(FileReference fileReference) {
+ private Future<Optional<File>> getFutureFile(FileReferenceDownload fileReferenceDownload) {
+ FileReference fileReference = fileReferenceDownload.fileReference();
Objects.requireNonNull(fileReference, "file reference cannot be null");
File directory = new File(downloadDirectory, fileReference.value());
log.log(LogLevel.DEBUG, () -> "Checking if there is a file in '" + directory.getAbsolutePath() + "' ");
@@ -70,21 +74,10 @@ public class FileDownloader {
} else {
log.log(LogLevel.DEBUG, () -> "File reference '" + fileReference.value() + "' not found in " +
directory.getAbsolutePath() + ", starting download");
- return queueForAsyncDownload(fileReference, timeout);
+ return queueForAsyncDownload(fileReferenceDownload, timeout);
}
}
- // Start downloading, but there is no Future used get file being downloaded
- public void queueForAsyncDownload(List<FileReference> fileReferences) {
- fileReferences.forEach(fileReference -> {
- if (fileReferenceDownloader.isDownloading(fileReference)) {
- log.log(LogLevel.DEBUG, () -> "Already downloading '" + fileReference.value() + "'");
- } else {
- queueForAsyncDownload(fileReference);
- }
- });
- }
-
void receiveFile(FileReferenceData fileReferenceData) {
fileReferenceDownloader.receiveFile(fileReferenceData);
}
@@ -117,20 +110,20 @@ public class FileDownloader {
return Optional.empty();
}
- private synchronized Future<Optional<File>> queueForAsyncDownload(FileReference fileReference, Duration timeout) {
- Future<Optional<File>> inProgress = fileReferenceDownloader.addDownloadListener(fileReference, () -> getFile(fileReference));
+ public synchronized Future<Optional<File>> queueForAsyncDownload(FileReferenceDownload fileReferenceDownload, Duration timeout) {
+ FileReference fileReference = fileReferenceDownload.fileReference();
+ Future<Optional<File>> inProgress = fileReferenceDownloader.addDownloadListener(fileReference, () -> getFile(fileReferenceDownload));
if (inProgress != null) {
log.log(LogLevel.DEBUG, () -> "Already downloading '" + fileReference.value() + "'");
return inProgress;
}
- Future<Optional<File>> future = queueForAsyncDownload(fileReference);
+ Future<Optional<File>> future = queueForAsyncDownload(fileReferenceDownload);
log.log(LogLevel.DEBUG, () -> "Queued '" + fileReference.value() + "' for download with timeout " + timeout);
return future;
}
- private Future<Optional<File>> queueForAsyncDownload(FileReference fileReference) {
- FileReferenceDownload fileReferenceDownload = new FileReferenceDownload(fileReference, SettableFuture.create());
+ public Future<Optional<File>> queueForAsyncDownload(FileReferenceDownload fileReferenceDownload) {
fileReferenceDownloader.addToDownloadQueue(fileReferenceDownload);
return fileReferenceDownload.future();
}
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java
index 904eaae8c4a..4a9fadf1a61 100644
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java
@@ -8,19 +8,33 @@ import com.yahoo.config.FileReference;
import java.io.File;
import java.util.Optional;
-class FileReferenceDownload {
+public class FileReferenceDownload {
+
private final FileReference fileReference;
private final SettableFuture<Optional<File>> future;
+ // If a config server wants to download from another config server (because it does not have the
+ // file itself) we set this flag to true to avoid an eternal loop
+ private final boolean downloadFromOtherSourceIfNotFound;
+
+ public FileReferenceDownload(FileReference fileReference) {
+ this(fileReference, true);
+ }
- FileReferenceDownload(FileReference fileReference, SettableFuture<Optional<File>> future) {
+ public FileReferenceDownload(FileReference fileReference, boolean downloadFromOtherSourceIfNotFound) {
this.fileReference = fileReference;
- this.future = future;
+ this.future = SettableFuture.create();
+ this.downloadFromOtherSourceIfNotFound = downloadFromOtherSourceIfNotFound;
}
FileReference fileReference() {
return fileReference;
}
+
SettableFuture<Optional<File>> future() {
return future;
}
+
+ boolean downloadFromOtherSourceIfNotFound() {
+ return downloadFromOtherSourceIfNotFound;
+ }
}
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java
index 2084c00d31b..1008df229f1 100644
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java
@@ -6,6 +6,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.yahoo.concurrent.DaemonThreadFactory;
import com.yahoo.config.FileReference;
import com.yahoo.jrt.ErrorCode;
+import com.yahoo.jrt.Int32Value;
import com.yahoo.jrt.Request;
import com.yahoo.jrt.StringValue;
import com.yahoo.log.LogLevel;
@@ -58,7 +59,7 @@ public class FileReferenceDownloader {
boolean downloadStarted = false;
while ((System.currentTimeMillis() < end) && !downloadStarted) {
try {
- if (startDownloadRpc(fileReference)) {
+ if (startDownloadRpc(fileReferenceDownload)) {
downloadStarted = true;
} else {
Thread.sleep(sleepBetweenRetries.toMillis());
@@ -109,19 +110,21 @@ public class FileReferenceDownloader {
}
}
- private boolean startDownloadRpc(FileReference fileReference) {
+ private boolean startDownloadRpc(FileReferenceDownload fileReferenceDownload) {
Connection connection = connectionPool.getCurrent();
Request request = new Request("filedistribution.serveFile");
- request.parameters().add(new StringValue(fileReference.value()));
+ String fileReference = fileReferenceDownload.fileReference().value();
+ request.parameters().add(new StringValue(fileReference));
+ request.parameters().add(new Int32Value(fileReferenceDownload.downloadFromOtherSourceIfNotFound() ? 0 : 1));
execute(request, connection);
if (validateResponse(request)) {
log.log(LogLevel.DEBUG, () -> "Request callback, OK. Req: " + request + "\nSpec: " + connection);
if (request.returnValues().get(0).asInt32() == 0) {
- log.log(LogLevel.DEBUG, () -> "Found file reference '" + fileReference.value() + "' available at " + connection.getAddress());
+ log.log(LogLevel.DEBUG, () -> "Found file reference '" + fileReference + "' available at " + connection.getAddress());
return true;
} else {
- log.log(LogLevel.DEBUG, "File reference '" + fileReference.value() + "' not found for " + connection.getAddress());
+ log.log(LogLevel.DEBUG, "File reference '" + fileReference + "' not found for " + connection.getAddress());
connectionPool.setNewCurrentConnection();
return false;
}
diff --git a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java
index d433dc25105..e0ef2ecf7e4 100644
--- a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java
+++ b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java
@@ -22,7 +22,6 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Arrays;
-import java.util.List;
import java.util.Optional;
import static com.yahoo.jrt.ErrorCode.CONNECTION;
@@ -190,8 +189,8 @@ public class FileDownloaderTest {
FileDownloader fileDownloader = new FileDownloader(connectionPool, downloadDir, tempDir, timeout, sleepBetweenRetries);
FileReference foo = new FileReference("foo");
FileReference bar = new FileReference("bar");
- List<FileReference> fileReferences = Arrays.asList(foo, bar);
- fileDownloader.queueForAsyncDownload(fileReferences);
+ fileDownloader.queueForAsyncDownload(new FileReferenceDownload(foo));
+ fileDownloader.queueForAsyncDownload(new FileReferenceDownload(bar));
// Verify download status
assertDownloadStatus(fileDownloader, foo, 0.0);