summaryrefslogtreecommitdiffstats
path: root/configserver/src/main
diff options
context:
space:
mode:
authorHarald Musum <musum@yahooinc.com>2021-11-04 14:23:53 +0100
committerHarald Musum <musum@yahooinc.com>2021-11-04 14:23:53 +0100
commitc3c0e83ce6c50071114861250ee81b007218515a (patch)
treefba7e240fb99dd572c3fb3247b2abc04603ad01c /configserver/src/main
parentaca3d8e19bdcd75736f25063b30bfbf6715c3512 (diff)
Wait for futures to finish
Diffstat (limited to 'configserver/src/main')
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/ApplicationPackageMaintainer.java56
1 files changed, 35 insertions, 21 deletions
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/ApplicationPackageMaintainer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/ApplicationPackageMaintainer.java
index e978384bf09..6f7a3045478 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/ApplicationPackageMaintainer.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/ApplicationPackageMaintainer.java
@@ -20,8 +20,12 @@ import com.yahoo.vespa.flags.FlagSource;
import java.io.File;
import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
import java.util.logging.Logger;
import static com.yahoo.vespa.config.server.filedistribution.FileDistributionUtil.fileReferenceExistsOnDisk;
@@ -61,6 +65,7 @@ public class ApplicationPackageMaintainer extends ConfigServerMaintainer {
final AtomicInteger attempts = new AtomicInteger(0);
final AtomicInteger failures = new AtomicInteger(0);
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
try (var fileDownloader = createFileDownloader()) {
for (var applicationId : applicationRepository.listApplications()) {
log.fine(() -> "Verifying application package for " + applicationId);
@@ -68,30 +73,39 @@ public class ApplicationPackageMaintainer extends ConfigServerMaintainer {
if (session == null) continue; // App might be deleted after call to listApplications()
FileReference applicationPackage = session.getApplicationPackageReference();
- long sessionId = session.getSessionId();
- log.fine(() -> "Verifying application package file reference " + applicationPackage + " for session " + sessionId);
-
- if (applicationPackage != null) {
- if (! fileReferenceExistsOnDisk(downloadDirectory, applicationPackage)) {
- log.fine(() -> "Downloading missing application package for application " + applicationId + " (session " + sessionId + ")");
-
- FileReferenceDownload download = new FileReferenceDownload(applicationPackage,
- false,
- this.getClass().getSimpleName());
- CompletableFuture.supplyAsync(() -> fileDownloader.getFile(download))
- .thenAccept(file -> {
- if (file.isPresent()) {
- attempts.incrementAndGet();
- createLocalSessionIfMissing(applicationId, sessionId);
- } else {
- failures.incrementAndGet();
- log.warning("Failed to download application package for application " + applicationId + " (session " + sessionId + ")");
- }
- });
- }
+ if (applicationPackage == null) continue;
+
+ if ( ! fileReferenceExistsOnDisk(downloadDirectory, applicationPackage)) {
+ long sessionId = session.getSessionId();
+ log.fine(() -> "Downloading application package for " + applicationId +
+ " application package reference " + applicationPackage +
+ " (session " + sessionId + ")");
+
+ FileReferenceDownload download = new FileReferenceDownload(applicationPackage,
+ false,
+ this.getClass().getSimpleName());
+ futures.add(CompletableFuture.supplyAsync(() -> fileDownloader.getFile(download))
+ .thenAccept(file -> {
+ if (file.isPresent()) {
+ attempts.incrementAndGet();
+ createLocalSessionIfMissing(applicationId, sessionId);
+ } else {
+ failures.incrementAndGet();
+ log.warning("Failed to download application package for application " +
+ applicationId + " (session " + sessionId + ")");
+ }
+ }));
}
}
}
+ log.fine(() -> "Attempts: " + attempts.get() + ", failures: " + failures.get());
+ futures.forEach(future -> {
+ try {
+ future.get();
+ } catch (InterruptedException | ExecutionException e) {
+ log.log(Level.WARNING, "Failed to get future", e);
+ }
+ });
return asSuccessFactor(attempts.get(), failures.get());
}