diff options
author | Harald Musum <musum@yahooinc.com> | 2024-01-22 12:00:53 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-22 12:00:53 +0100 |
commit | 2815c30c0c7f1d0cb16d04c22f1bdd01e1810f3f (patch) | |
tree | 2db3d2990c3338dc0bbfa9f9d21073c6a499e31e | |
parent | 9a56d4de0e25dd2b8059fb4eb5f4b2d39bb09458 (diff) | |
parent | 8734b1c8a73fbe936c71ef46b94df8e320798ed8 (diff) |
Merge pull request #30007 from vespa-engine/jonmv/clean-up-dangling-remote-sessions-too
Download application packages in parallel
4 files changed, 69 insertions, 40 deletions
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/ApplicationPackageMaintainer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/ApplicationPackageMaintainer.java index f18aa9acc6c..b85f9376f61 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/ApplicationPackageMaintainer.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/ApplicationPackageMaintainer.java @@ -20,8 +20,11 @@ import com.yahoo.vespa.filedistribution.FileReferenceDownload; import java.io.File; import java.time.Duration; +import java.util.ArrayList; import java.util.List; import java.util.Optional; +import java.util.concurrent.Future; +import java.util.function.Supplier; import java.util.logging.Logger; import static com.yahoo.vespa.config.server.filedistribution.FileDistributionUtil.fileReferenceExistsOnDisk; @@ -52,44 +55,57 @@ public class ApplicationPackageMaintainer extends ConfigServerMaintainer { @Override protected double maintain() { int attempts = 0; - int failures = 0; + int[] failures = new int[1]; + List<Runnable> futureDownloads = new ArrayList<>(); for (TenantName tenantName : applicationRepository.tenantRepository().getAllTenantNames()) for (Session session : applicationRepository.tenantRepository().getTenant(tenantName).getSessionRepository().getRemoteSessions()) { - if (shuttingDown()) - break; + if (shuttingDown()) + break; - switch (session.getStatus()) { - case PREPARE, ACTIVATE: break; - default: continue; - } + switch (session.getStatus()) { + case PREPARE, ACTIVATE: break; + default: continue; + } - var applicationId = session.getApplicationId(); - log.finest(() -> "Verifying application package for " + applicationId); - - Optional<FileReference> appFileReference = session.getApplicationPackageReference(); - if (appFileReference.isPresent()) { - long sessionId = session.getSessionId(); - attempts++; - if (!fileReferenceExistsOnDisk(downloadDirectory, appFileReference.get())) { - log.fine(() -> "Downloading application package with file reference " + appFileReference + - " for " + applicationId + " (session " + sessionId + ")"); - - FileReferenceDownload download = new FileReferenceDownload(appFileReference.get(), - this.getClass().getSimpleName(), - false); - if (fileDownloader.getFile(download).isEmpty()) { - failures++; - log.info("Downloading application package (" + appFileReference + ")" + - " for " + applicationId + " (session " + sessionId + ") unsuccessful. " + - "Can be ignored unless it happens many times over a long period of time, retries is expected"); - continue; + var applicationId = session.getApplicationId(); + log.finest(() -> "Verifying application package for " + applicationId); + + Optional<FileReference> appFileReference = session.getApplicationPackageReference(); + if (appFileReference.isPresent()) { + long sessionId = session.getSessionId(); + attempts++; + if (!fileReferenceExistsOnDisk(downloadDirectory, appFileReference.get())) { + log.fine(() -> "Downloading application package with file reference " + appFileReference + + " for " + applicationId + " (session " + sessionId + ")"); + + FileReferenceDownload download = new FileReferenceDownload(appFileReference.get(), + this.getClass().getSimpleName(), + false); + Future<Optional<File>> futureDownload = fileDownloader.getFutureFileOrTimeout(download); + futureDownloads.add(() -> { + try { + if (futureDownload.get().isPresent()) { + createLocalSessionIfMissing(applicationId, sessionId); + return; + } + } + catch (Exception ignored) { } + failures[0]++; + log.info("Downloading application package (" + appFileReference + ")" + + " for " + applicationId + " (session " + sessionId + ") unsuccessful. " + + "Can be ignored unless it happens many times over a long period of time, retries is expected"); + }); + } + else { + createLocalSessionIfMissing(applicationId, sessionId); } } - createLocalSessionIfMissing(applicationId, sessionId); } - } - return asSuccessFactorDeviation(attempts, failures); + + futureDownloads.forEach(Runnable::run); + + return asSuccessFactorDeviation(attempts, failures[0]); } private static FileDownloader createFileDownloader(ApplicationRepository applicationRepository, @@ -98,7 +114,7 @@ public class ApplicationPackageMaintainer extends ConfigServerMaintainer { List<String> otherConfigServersInCluster = getOtherConfigServersInCluster(applicationRepository.configserverConfig()); ConfigSourceSet configSourceSet = new ConfigSourceSet(otherConfigServersInCluster); ConnectionPool connectionPool = new FileDistributionConnectionPool(configSourceSet, supervisor); - return new FileDownloader(connectionPool, supervisor, downloadDirectory, Duration.ofSeconds(300)); + return new FileDownloader(connectionPool, supervisor, downloadDirectory, Duration.ofSeconds(60)); } @Override diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java index 891284a3a0e..e0a58888109 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java @@ -206,7 +206,7 @@ public class FileServerTest { super(FileDownloader.emptyConnectionPool(), new Supervisor(new Transport("mock")).setDropEmptyBuffers(true), downloadDirectory, - Duration.ofMillis(100), + Duration.ofMillis(1000), Duration.ofMillis(100)); } diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java index 2854ef8836a..72f0fb977d5 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java @@ -74,7 +74,17 @@ public class FileDownloader implements AutoCloseable { } } - Future<Optional<File>> getFutureFile(FileReferenceDownload fileReferenceDownload) { + /** Returns a future that times out if download takes too long, and return empty on unsuccessful download. */ + public Future<Optional<File>> getFutureFileOrTimeout(FileReferenceDownload fileReferenceDownload) { + return getFutureFile(fileReferenceDownload) + .orTimeout(timeout.toMillis(), TimeUnit.MILLISECONDS) + .exceptionally(thrown -> { + fileReferenceDownloader.failedDownloading(fileReferenceDownload.fileReference()); + return Optional.empty(); + }); + } + + CompletableFuture<Optional<File>> getFutureFile(FileReferenceDownload fileReferenceDownload) { FileReference fileReference = fileReferenceDownload.fileReference(); Optional<File> file = getFileFromFileSystem(fileReference); @@ -135,7 +145,7 @@ public class FileDownloader implements AutoCloseable { } /** Start downloading, the future returned will be complete()d by receiving method in {@link FileReceiver} */ - private synchronized Future<Optional<File>> startDownload(FileReferenceDownload fileReferenceDownload) { + private synchronized CompletableFuture<Optional<File>> startDownload(FileReferenceDownload fileReferenceDownload) { return fileReferenceDownloader.startDownload(fileReferenceDownload); } diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java index 450801ce530..5ad197e8633 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java @@ -15,6 +15,7 @@ import java.time.Instant; import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -67,7 +68,7 @@ public class FileReferenceDownloader { int retryCount = 0; Connection connection = connectionPool.getCurrent(); do { - backoff(retryCount); + backoff(retryCount, end); if (FileDownloader.fileReferenceExists(fileReference, downloadDirectory)) return; @@ -79,24 +80,26 @@ public class FileReferenceDownloader { // exist on just one config server, and which one could be different for each file reference), so // switch to a new connection for every retry connection = connectionPool.switchConnection(connection); - } while (retryCount < 5 || Instant.now().isAfter(end)); + } while (Instant.now().isBefore(end)); fileReferenceDownload.future().completeExceptionally(new RuntimeException("Failed getting " + fileReference)); downloads.remove(fileReference); } - private void backoff(int retryCount) { + private void backoff(int retryCount, Instant end) { if (retryCount > 0) { try { - long sleepTime = Math.min(120_000, (long) (Math.pow(2, retryCount)) * sleepBetweenRetries.toMillis()); - Thread.sleep(sleepTime); + long sleepTime = Math.min(120_000, + Math.min((long) (Math.pow(2, retryCount)) * sleepBetweenRetries.toMillis(), + Duration.between(Instant.now(), end).toMillis())); + if (sleepTime > 0) Thread.sleep(sleepTime); } catch (InterruptedException e) { /* ignored */ } } } - Future<Optional<File>> startDownload(FileReferenceDownload fileReferenceDownload) { + CompletableFuture<Optional<File>> startDownload(FileReferenceDownload fileReferenceDownload) { FileReference fileReference = fileReferenceDownload.fileReference(); Optional<FileReferenceDownload> inProgress = downloads.get(fileReference); if (inProgress.isPresent()) return inProgress.get().future(); |