summaryrefslogtreecommitdiffstats
path: root/filedistribution
diff options
context:
space:
mode:
authorHarald Musum <musum@verizonmedia.com>2020-03-30 11:10:12 +0200
committerHarald Musum <musum@verizonmedia.com>2020-03-30 11:10:12 +0200
commitac5adb81b5f51c15ddd51a0882a7ccae1e59e96e (patch)
tree3e54ac55e3c42a01ca38240c5d5505d06cc4631b /filedistribution
parenta12f35147ac576ffdc090921b9829a1f7bcfe2e2 (diff)
Make sure to only add to downloads when rpc call succeeds
Synchronize around all code that true to start download and only add to downloads if call succeeds
Diffstat (limited to 'filedistribution')
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java55
1 files changed, 23 insertions, 32 deletions
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java
index 66b86866c3e..2c71ec04ffd 100644
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java
@@ -52,39 +52,33 @@ public class FileReferenceDownloader {
new FileReceiver(connectionPool.getSupervisor(), this, downloadDirectory, tmpDirectory);
}
- private void startDownload(Duration timeout, FileReferenceDownload fileReferenceDownload) {
+ private void startDownload(FileReferenceDownload fileReferenceDownload) {
FileReference fileReference = fileReferenceDownload.fileReference();
- long end = System.currentTimeMillis() + timeout.toMillis();
- boolean downloadStarted = false;
+ long end = System.currentTimeMillis() + downloadTimeout.toMillis();
int retryCount = 0;
- while ((System.currentTimeMillis() < end) && !downloadStarted) {
- try {
- if (startDownloadRpc(fileReferenceDownload, retryCount)) {
- downloadStarted = true;
- } else {
- retryCount++;
- Thread.sleep(sleepBetweenRetries.toMillis());
- }
- }
- catch (InterruptedException e) { /* ignored */}
- }
- if ( !downloadStarted) {
- fileReferenceDownload.future().setException(new RuntimeException("Failed getting file reference '" + fileReference.value() + "'"));
- synchronized (downloads) {
- downloads.remove(fileReference);
+ log.log(LogLevel.DEBUG, () -> "Will download file reference '" + fileReference.value() + "' with timeout " + downloadTimeout);
+ synchronized (downloads) {
+ while (System.currentTimeMillis() < end) {
+ try {
+ if (startDownloadRpc(fileReferenceDownload, retryCount)) {
+ downloads.put(fileReference, fileReferenceDownload);
+ downloadStatus.put(fileReference, 0.0);
+ return;
+ } else {
+ retryCount++;
+ Thread.sleep(sleepBetweenRetries.toMillis());
+ }
+ } catch (InterruptedException e) { /* ignored */}
}
}
+
+ fileReferenceDownload.future().setException(
+ new RuntimeException("Failed to start download of file reference '" + fileReference.value() + "'"));
}
void addToDownloadQueue(FileReferenceDownload fileReferenceDownload) {
- FileReference fileReference = fileReferenceDownload.fileReference();
- log.log(LogLevel.DEBUG, () -> "Will download file reference '" + fileReference.value() + "' with timeout " + downloadTimeout);
- synchronized (downloads) {
- downloads.put(fileReference, fileReferenceDownload);
- downloadStatus.put(fileReference, 0.0);
- }
- downloadExecutor.submit(() -> startDownload(downloadTimeout, fileReferenceDownload));
+ downloadExecutor.submit(() -> startDownload(fileReferenceDownload));
}
void completedDownloading(FileReference fileReference, File file) {
@@ -114,10 +108,10 @@ public class FileReferenceDownloader {
request.parameters().add(new StringValue(fileReference));
request.parameters().add(new Int32Value(fileReferenceDownload.downloadFromOtherSourceIfNotFound() ? 0 : 1));
- execute(request, connection);
+ connection.invokeSync(request, (double) rpcTimeout.getSeconds());
Level logLevel = (retryCount > 0 ? LogLevel.INFO : LogLevel.DEBUG);
if (validateResponse(request)) {
- log.log(logLevel, () -> "Request callback, OK. Req: " + request + "\nSpec: " + connection);
+ log.log(logLevel, () -> "Request callback, OK. Req: " + request + "\nSpec: " + connection + ", retry count " + retryCount);
if (request.returnValues().get(0).asInt32() == 0) {
log.log(logLevel, () -> "Found file reference '" + fileReference + "' available at " + connection.getAddress());
return true;
@@ -128,7 +122,8 @@ public class FileReferenceDownloader {
}
} else {
log.log(logLevel, "Request failed. Req: " + request + "\nSpec: " + connection.getAddress() +
- ", error code: " + request.errorCode() + ", set error for connection and use another for next request");
+ ", error code: " + request.errorCode() + ", set error for spec, use another spec for next request" +
+ ", retry count " + retryCount);
connectionPool.setError(connection, request.errorCode());
return false;
}
@@ -151,10 +146,6 @@ public class FileReferenceDownloader {
return null;
}
- private void execute(Request request, Connection connection) {
- connection.invokeSync(request, (double) rpcTimeout.getSeconds());
- }
-
private boolean validateResponse(Request request) {
if (request.isError()) {
return false;