summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorjonmv <venstad@gmail.com>2024-01-22 11:29:10 +0100
committerjonmv <venstad@gmail.com>2024-01-22 11:29:10 +0100
commit8734b1c8a73fbe936c71ef46b94df8e320798ed8 (patch)
tree3e207bf616c4284948af50156438ffdbf59ba180
parent0fbc85c13cc832a6b5b3607e2af65a78bfb93600 (diff)
Download packages in parallel in maintainer
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/ApplicationPackageMaintainer.java76
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java14
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java3
3 files changed, 60 insertions, 33 deletions
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/ApplicationPackageMaintainer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/ApplicationPackageMaintainer.java
index b2c489cf074..b85f9376f61 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/ApplicationPackageMaintainer.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/ApplicationPackageMaintainer.java
@@ -20,8 +20,11 @@ import com.yahoo.vespa.filedistribution.FileReferenceDownload;
import java.io.File;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.Future;
+import java.util.function.Supplier;
import java.util.logging.Logger;
import static com.yahoo.vespa.config.server.filedistribution.FileDistributionUtil.fileReferenceExistsOnDisk;
@@ -52,44 +55,57 @@ public class ApplicationPackageMaintainer extends ConfigServerMaintainer {
@Override
protected double maintain() {
int attempts = 0;
- int failures = 0;
+ int[] failures = new int[1];
+ List<Runnable> futureDownloads = new ArrayList<>();
for (TenantName tenantName : applicationRepository.tenantRepository().getAllTenantNames())
for (Session session : applicationRepository.tenantRepository().getTenant(tenantName).getSessionRepository().getRemoteSessions()) {
- if (shuttingDown())
- break;
+ if (shuttingDown())
+ break;
- switch (session.getStatus()) {
- case PREPARE, ACTIVATE: break;
- default: continue;
- }
+ switch (session.getStatus()) {
+ case PREPARE, ACTIVATE: break;
+ default: continue;
+ }
- var applicationId = session.getApplicationId();
- log.finest(() -> "Verifying application package for " + applicationId);
-
- Optional<FileReference> appFileReference = session.getApplicationPackageReference();
- if (appFileReference.isPresent()) {
- long sessionId = session.getSessionId();
- attempts++;
- if (!fileReferenceExistsOnDisk(downloadDirectory, appFileReference.get())) {
- log.fine(() -> "Downloading application package with file reference " + appFileReference +
- " for " + applicationId + " (session " + sessionId + ")");
-
- FileReferenceDownload download = new FileReferenceDownload(appFileReference.get(),
- this.getClass().getSimpleName(),
- false);
- if (fileDownloader.getFile(download).isEmpty()) {
- failures++;
- log.info("Downloading application package (" + appFileReference + ")" +
- " for " + applicationId + " (session " + sessionId + ") unsuccessful. " +
- "Can be ignored unless it happens many times over a long period of time, retries is expected");
- continue;
+ var applicationId = session.getApplicationId();
+ log.finest(() -> "Verifying application package for " + applicationId);
+
+ Optional<FileReference> appFileReference = session.getApplicationPackageReference();
+ if (appFileReference.isPresent()) {
+ long sessionId = session.getSessionId();
+ attempts++;
+ if (!fileReferenceExistsOnDisk(downloadDirectory, appFileReference.get())) {
+ log.fine(() -> "Downloading application package with file reference " + appFileReference +
+ " for " + applicationId + " (session " + sessionId + ")");
+
+ FileReferenceDownload download = new FileReferenceDownload(appFileReference.get(),
+ this.getClass().getSimpleName(),
+ false);
+ Future<Optional<File>> futureDownload = fileDownloader.getFutureFileOrTimeout(download);
+ futureDownloads.add(() -> {
+ try {
+ if (futureDownload.get().isPresent()) {
+ createLocalSessionIfMissing(applicationId, sessionId);
+ return;
+ }
+ }
+ catch (Exception ignored) { }
+ failures[0]++;
+ log.info("Downloading application package (" + appFileReference + ")" +
+ " for " + applicationId + " (session " + sessionId + ") unsuccessful. " +
+ "Can be ignored unless it happens many times over a long period of time, retries is expected");
+ });
+ }
+ else {
+ createLocalSessionIfMissing(applicationId, sessionId);
}
}
- createLocalSessionIfMissing(applicationId, sessionId);
}
- }
- return asSuccessFactorDeviation(attempts, failures);
+
+ futureDownloads.forEach(Runnable::run);
+
+ return asSuccessFactorDeviation(attempts, failures[0]);
}
private static FileDownloader createFileDownloader(ApplicationRepository applicationRepository,
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java
index 2854ef8836a..72f0fb977d5 100644
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java
@@ -74,7 +74,17 @@ public class FileDownloader implements AutoCloseable {
}
}
- Future<Optional<File>> getFutureFile(FileReferenceDownload fileReferenceDownload) {
+ /** Returns a future that times out if download takes too long, and return empty on unsuccessful download. */
+ public Future<Optional<File>> getFutureFileOrTimeout(FileReferenceDownload fileReferenceDownload) {
+ return getFutureFile(fileReferenceDownload)
+ .orTimeout(timeout.toMillis(), TimeUnit.MILLISECONDS)
+ .exceptionally(thrown -> {
+ fileReferenceDownloader.failedDownloading(fileReferenceDownload.fileReference());
+ return Optional.empty();
+ });
+ }
+
+ CompletableFuture<Optional<File>> getFutureFile(FileReferenceDownload fileReferenceDownload) {
FileReference fileReference = fileReferenceDownload.fileReference();
Optional<File> file = getFileFromFileSystem(fileReference);
@@ -135,7 +145,7 @@ public class FileDownloader implements AutoCloseable {
}
/** Start downloading, the future returned will be complete()d by receiving method in {@link FileReceiver} */
- private synchronized Future<Optional<File>> startDownload(FileReferenceDownload fileReferenceDownload) {
+ private synchronized CompletableFuture<Optional<File>> startDownload(FileReferenceDownload fileReferenceDownload) {
return fileReferenceDownloader.startDownload(fileReferenceDownload);
}
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java
index e26936cd5cf..5ad197e8633 100644
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java
@@ -15,6 +15,7 @@ import java.time.Instant;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -98,7 +99,7 @@ public class FileReferenceDownloader {
}
}
- Future<Optional<File>> startDownload(FileReferenceDownload fileReferenceDownload) {
+ CompletableFuture<Optional<File>> startDownload(FileReferenceDownload fileReferenceDownload) {
FileReference fileReference = fileReferenceDownload.fileReference();
Optional<FileReferenceDownload> inProgress = downloads.get(fileReference);
if (inProgress.isPresent()) return inProgress.get().future();