aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHarald Musum <musum@yahooinc.com>2024-01-22 12:00:53 +0100
committerGitHub <noreply@github.com>2024-01-22 12:00:53 +0100
commit2815c30c0c7f1d0cb16d04c22f1bdd01e1810f3f (patch)
tree2db3d2990c3338dc0bbfa9f9d21073c6a499e31e
parent9a56d4de0e25dd2b8059fb4eb5f4b2d39bb09458 (diff)
parent8734b1c8a73fbe936c71ef46b94df8e320798ed8 (diff)
Merge pull request #30007 from vespa-engine/jonmv/clean-up-dangling-remote-sessions-too
Download application packages in parallel
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/ApplicationPackageMaintainer.java78
-rw-r--r--configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java2
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java14
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java15
4 files changed, 69 insertions, 40 deletions
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/ApplicationPackageMaintainer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/ApplicationPackageMaintainer.java
index f18aa9acc6c..b85f9376f61 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/ApplicationPackageMaintainer.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/ApplicationPackageMaintainer.java
@@ -20,8 +20,11 @@ import com.yahoo.vespa.filedistribution.FileReferenceDownload;
import java.io.File;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.Future;
+import java.util.function.Supplier;
import java.util.logging.Logger;
import static com.yahoo.vespa.config.server.filedistribution.FileDistributionUtil.fileReferenceExistsOnDisk;
@@ -52,44 +55,57 @@ public class ApplicationPackageMaintainer extends ConfigServerMaintainer {
@Override
protected double maintain() {
int attempts = 0;
- int failures = 0;
+ int[] failures = new int[1];
+ List<Runnable> futureDownloads = new ArrayList<>();
for (TenantName tenantName : applicationRepository.tenantRepository().getAllTenantNames())
for (Session session : applicationRepository.tenantRepository().getTenant(tenantName).getSessionRepository().getRemoteSessions()) {
- if (shuttingDown())
- break;
+ if (shuttingDown())
+ break;
- switch (session.getStatus()) {
- case PREPARE, ACTIVATE: break;
- default: continue;
- }
+ switch (session.getStatus()) {
+ case PREPARE, ACTIVATE: break;
+ default: continue;
+ }
- var applicationId = session.getApplicationId();
- log.finest(() -> "Verifying application package for " + applicationId);
-
- Optional<FileReference> appFileReference = session.getApplicationPackageReference();
- if (appFileReference.isPresent()) {
- long sessionId = session.getSessionId();
- attempts++;
- if (!fileReferenceExistsOnDisk(downloadDirectory, appFileReference.get())) {
- log.fine(() -> "Downloading application package with file reference " + appFileReference +
- " for " + applicationId + " (session " + sessionId + ")");
-
- FileReferenceDownload download = new FileReferenceDownload(appFileReference.get(),
- this.getClass().getSimpleName(),
- false);
- if (fileDownloader.getFile(download).isEmpty()) {
- failures++;
- log.info("Downloading application package (" + appFileReference + ")" +
- " for " + applicationId + " (session " + sessionId + ") unsuccessful. " +
- "Can be ignored unless it happens many times over a long period of time, retries is expected");
- continue;
+ var applicationId = session.getApplicationId();
+ log.finest(() -> "Verifying application package for " + applicationId);
+
+ Optional<FileReference> appFileReference = session.getApplicationPackageReference();
+ if (appFileReference.isPresent()) {
+ long sessionId = session.getSessionId();
+ attempts++;
+ if (!fileReferenceExistsOnDisk(downloadDirectory, appFileReference.get())) {
+ log.fine(() -> "Downloading application package with file reference " + appFileReference +
+ " for " + applicationId + " (session " + sessionId + ")");
+
+ FileReferenceDownload download = new FileReferenceDownload(appFileReference.get(),
+ this.getClass().getSimpleName(),
+ false);
+ Future<Optional<File>> futureDownload = fileDownloader.getFutureFileOrTimeout(download);
+ futureDownloads.add(() -> {
+ try {
+ if (futureDownload.get().isPresent()) {
+ createLocalSessionIfMissing(applicationId, sessionId);
+ return;
+ }
+ }
+ catch (Exception ignored) { }
+ failures[0]++;
+ log.info("Downloading application package (" + appFileReference + ")" +
+ " for " + applicationId + " (session " + sessionId + ") unsuccessful. " +
+ "Can be ignored unless it happens many times over a long period of time, retries is expected");
+ });
+ }
+ else {
+ createLocalSessionIfMissing(applicationId, sessionId);
}
}
- createLocalSessionIfMissing(applicationId, sessionId);
}
- }
- return asSuccessFactorDeviation(attempts, failures);
+
+ futureDownloads.forEach(Runnable::run);
+
+ return asSuccessFactorDeviation(attempts, failures[0]);
}
private static FileDownloader createFileDownloader(ApplicationRepository applicationRepository,
@@ -98,7 +114,7 @@ public class ApplicationPackageMaintainer extends ConfigServerMaintainer {
List<String> otherConfigServersInCluster = getOtherConfigServersInCluster(applicationRepository.configserverConfig());
ConfigSourceSet configSourceSet = new ConfigSourceSet(otherConfigServersInCluster);
ConnectionPool connectionPool = new FileDistributionConnectionPool(configSourceSet, supervisor);
- return new FileDownloader(connectionPool, supervisor, downloadDirectory, Duration.ofSeconds(300));
+ return new FileDownloader(connectionPool, supervisor, downloadDirectory, Duration.ofSeconds(60));
}
@Override
diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java
index 891284a3a0e..e0a58888109 100644
--- a/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java
+++ b/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java
@@ -206,7 +206,7 @@ public class FileServerTest {
super(FileDownloader.emptyConnectionPool(),
new Supervisor(new Transport("mock")).setDropEmptyBuffers(true),
downloadDirectory,
- Duration.ofMillis(100),
+ Duration.ofMillis(1000),
Duration.ofMillis(100));
}
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java
index 2854ef8836a..72f0fb977d5 100644
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java
@@ -74,7 +74,17 @@ public class FileDownloader implements AutoCloseable {
}
}
- Future<Optional<File>> getFutureFile(FileReferenceDownload fileReferenceDownload) {
+ /** Returns a future that times out if download takes too long, and return empty on unsuccessful download. */
+ public Future<Optional<File>> getFutureFileOrTimeout(FileReferenceDownload fileReferenceDownload) {
+ return getFutureFile(fileReferenceDownload)
+ .orTimeout(timeout.toMillis(), TimeUnit.MILLISECONDS)
+ .exceptionally(thrown -> {
+ fileReferenceDownloader.failedDownloading(fileReferenceDownload.fileReference());
+ return Optional.empty();
+ });
+ }
+
+ CompletableFuture<Optional<File>> getFutureFile(FileReferenceDownload fileReferenceDownload) {
FileReference fileReference = fileReferenceDownload.fileReference();
Optional<File> file = getFileFromFileSystem(fileReference);
@@ -135,7 +145,7 @@ public class FileDownloader implements AutoCloseable {
}
/** Start downloading, the future returned will be complete()d by receiving method in {@link FileReceiver} */
- private synchronized Future<Optional<File>> startDownload(FileReferenceDownload fileReferenceDownload) {
+ private synchronized CompletableFuture<Optional<File>> startDownload(FileReferenceDownload fileReferenceDownload) {
return fileReferenceDownloader.startDownload(fileReferenceDownload);
}
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java
index 450801ce530..5ad197e8633 100644
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java
@@ -15,6 +15,7 @@ import java.time.Instant;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -67,7 +68,7 @@ public class FileReferenceDownloader {
int retryCount = 0;
Connection connection = connectionPool.getCurrent();
do {
- backoff(retryCount);
+ backoff(retryCount, end);
if (FileDownloader.fileReferenceExists(fileReference, downloadDirectory))
return;
@@ -79,24 +80,26 @@ public class FileReferenceDownloader {
// exist on just one config server, and which one could be different for each file reference), so
// switch to a new connection for every retry
connection = connectionPool.switchConnection(connection);
- } while (retryCount < 5 || Instant.now().isAfter(end));
+ } while (Instant.now().isBefore(end));
fileReferenceDownload.future().completeExceptionally(new RuntimeException("Failed getting " + fileReference));
downloads.remove(fileReference);
}
- private void backoff(int retryCount) {
+ private void backoff(int retryCount, Instant end) {
if (retryCount > 0) {
try {
- long sleepTime = Math.min(120_000, (long) (Math.pow(2, retryCount)) * sleepBetweenRetries.toMillis());
- Thread.sleep(sleepTime);
+ long sleepTime = Math.min(120_000,
+ Math.min((long) (Math.pow(2, retryCount)) * sleepBetweenRetries.toMillis(),
+ Duration.between(Instant.now(), end).toMillis()));
+ if (sleepTime > 0) Thread.sleep(sleepTime);
} catch (InterruptedException e) {
/* ignored */
}
}
}
- Future<Optional<File>> startDownload(FileReferenceDownload fileReferenceDownload) {
+ CompletableFuture<Optional<File>> startDownload(FileReferenceDownload fileReferenceDownload) {
FileReference fileReference = fileReferenceDownload.fileReference();
Optional<FileReferenceDownload> inProgress = downloads.get(fileReference);
if (inProgress.isPresent()) return inProgress.get().future();