summaryrefslogtreecommitdiffstats
path: root/filedistribution
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-12-07 15:45:00 +0100
committerHenning Baldersheim <balder@yahoo-inc.com>2017-12-07 15:53:29 +0100
commitb9b0b7d153cde0f6d023d849c8ffdcefb533a277 (patch)
treed7e029c0fe9d4f7b9fd2f3dbd0dacd4a6f0cc474 /filedistribution
parent135ef16d40619bc7fe186b1cdfe825a39b22492d (diff)
Detach and serve from separate thread.
Diffstat (limited to 'filedistribution')
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java62
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java5
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java123
3 files changed, 92 insertions, 98 deletions
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java
index 9fe5eec54ff..ca8b8c27ad3 100644
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java
@@ -1,13 +1,11 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.filedistribution;
-import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.yahoo.config.FileReference;
import com.yahoo.log.LogLevel;
import com.yahoo.vespa.config.ConnectionPool;
import com.yahoo.vespa.defaults.Defaults;
-import com.yahoo.yolean.Exceptions;
import java.io.File;
import java.time.Duration;
@@ -17,6 +15,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Logger;
@@ -47,17 +46,28 @@ public class FileDownloader {
}
public Optional<File> getFile(FileReference fileReference) {
+ try {
+ return getFutureFile(fileReference).get(timeout.toMillis(), TimeUnit.MILLISECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ return Optional.empty();
+ }
+ }
+
+ public Future<Optional<File>> getFutureFile(FileReference fileReference) {
Objects.requireNonNull(fileReference, "file reference cannot be null");
File directory = new File(downloadDirectory, fileReference.value());
log.log(LogLevel.DEBUG, "Checking if there is a file in '" + directory.getAbsolutePath() + "' ");
Optional<File> file = getFileFromFileSystem(fileReference, directory);
if (file.isPresent()) {
- return file;
+ SettableFuture<Optional<File>> future = SettableFuture.create();
+ future.set(file);
+ return future;
} else {
log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' not found in " +
directory.getAbsolutePath() + ", starting download");
return queueForDownload(fileReference, timeout);
+
}
}
@@ -86,11 +96,9 @@ public class FileDownloader {
if (directory.exists() && directory.isDirectory() && files != null && files.length > 0) {
File file = files[0];
if (!file.exists()) {
- throw new RuntimeException("File with reference '" + fileReference.value() +
- "' does not exist");
+ throw new RuntimeException("File with reference '" + fileReference.value() + "' does not exist");
} else if (!file.canRead()) {
- throw new RuntimeException("File with reference '" + fileReference.value() +
- "'exists, but unable to read it");
+ throw new RuntimeException("File with reference '" + fileReference.value() + "'exists, but unable to read it");
} else {
fileReferenceDownloader.setDownloadStatus(fileReference.value(), 100.0);
return Optional.of(file);
@@ -99,46 +107,26 @@ public class FileDownloader {
return Optional.empty();
}
- private synchronized Optional<File> queueForDownload(FileReference fileReference, Duration timeout) {
- if (fileReferenceDownloader.isDownloading(fileReference)) {
+ private synchronized Future<Optional<File>> queueForDownload(FileReference fileReference, Duration timeout) {
+ Future<Optional<File>> inProgress = fileReferenceDownloader.addDownloadListener(fileReference, () -> getFile(fileReference));
+ if (inProgress != null) {
log.log(LogLevel.INFO, "Already downloading '" + fileReference.value() + "'");
- ListenableFuture<Optional<File>> future =
- fileReferenceDownloader.addDownloadListener(fileReference, () -> getFile(fileReference));
- try {
- return future.get();
- } catch (InterruptedException | ExecutionException e) {
- throw new RuntimeException("Failed downloading file reference '" + fileReference.value() + "': " +
- Exceptions.toMessageString(e));
- }
+ return inProgress;
}
- SettableFuture<Optional<File>> future = SettableFuture.create();
- queueForDownload(new FileReferenceDownload(fileReference, future));
+ Future<Optional<File>> future = queueForDownload(fileReference);
log.log(LogLevel.INFO, "Queued '" + fileReference.value() + "' for download with timeout " + timeout);
-
- try {
- Optional<File> fileDownloaded;
- try {
- log.log(LogLevel.INFO, "Waiting for '" + fileReference.value() + "' to download");
- fileDownloaded = future.get(timeout.getSeconds() - 1, TimeUnit.SECONDS);
- log.log(LogLevel.INFO, "'" + fileReference.value() + "' downloaded");
- } catch (TimeoutException e) {
- log.log(LogLevel.WARNING, "Downloading '" + fileReference.value() + "' timed out");
- return Optional.empty();
- }
- return fileDownloaded;
- } catch (InterruptedException | ExecutionException e) {
- throw new RuntimeException("Could not download '" + fileReference.value() + "'");
- }
+ return future;
}
// We don't care about the future in this call
- private synchronized void queueForDownload(FileReference fileReference) {
- queueForDownload(new FileReferenceDownload(fileReference, SettableFuture.create()));
+ private Future<Optional<File>> queueForDownload(FileReference fileReference) {
+ return queueForDownload(new FileReferenceDownload(fileReference, SettableFuture.create()));
}
- private synchronized void queueForDownload(FileReferenceDownload fileReferenceDownload) {
+ private Future<Optional<File>> queueForDownload(FileReferenceDownload fileReferenceDownload) {
fileReferenceDownloader.addToDownloadQueue(fileReferenceDownload);
+ return fileReferenceDownload.future();
}
public FileReferenceDownloader fileReferenceDownloader() {
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java
index fb511411128..9de4c1fcd5b 100644
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java
@@ -2,13 +2,14 @@
package com.yahoo.vespa.filedistribution;
+import com.google.common.util.concurrent.AtomicDouble;
import com.google.common.util.concurrent.SettableFuture;
import com.yahoo.config.FileReference;
import java.io.File;
import java.util.Optional;
-public class FileReferenceDownload {
+class FileReferenceDownload {
private final FileReference fileReference;
private final SettableFuture<Optional<File>> future;
@@ -20,9 +21,7 @@ public class FileReferenceDownload {
FileReference fileReference() {
return fileReference;
}
-
SettableFuture<Optional<File>> future() {
return future;
}
-
}
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java
index b51a4b68212..3a9af23e6d8 100644
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java
@@ -20,6 +20,7 @@ import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -44,10 +45,7 @@ public class FileReferenceDownloader {
private final ExecutorService downloadExecutor =
Executors.newFixedThreadPool(10, new DaemonThreadFactory("filereference downloader"));
- private ExecutorService readFromQueueExecutor =
- Executors.newFixedThreadPool(1, new DaemonThreadFactory("filereference download queue"));
private final ConnectionPool connectionPool;
- private final ConcurrentLinkedQueue<FileReferenceDownload> downloadQueue = new ConcurrentLinkedQueue<>();
private final Map<FileReference, FileReferenceDownload> downloads = new LinkedHashMap<>();
private final Map<FileReference, Double> downloadStatus = new HashMap<>();
private final Duration downloadTimeout;
@@ -56,67 +54,57 @@ public class FileReferenceDownloader {
FileReferenceDownloader(File downloadDirectory, ConnectionPool connectionPool, Duration timeout) {
this.connectionPool = connectionPool;
this.downloadTimeout = timeout;
- readFromQueueExecutor.submit(this::readFromQueue);
this.fileReceiver = new FileReceiver(connectionPool.getSupervisor(), this, downloadDirectory);
}
- private synchronized Optional<File> startDownload(FileReference fileReference,
- Duration timeout,
- FileReferenceDownload fileReferenceDownload)
- throws ExecutionException, InterruptedException, TimeoutException {
- downloads.put(fileReference, fileReferenceDownload);
- setDownloadStatus(fileReference.value(), 0.0);
-
- int numAttempts = 0;
+ private void startDownload(FileReference fileReference, Duration timeout,
+ FileReferenceDownload fileReferenceDownload)
+ {
+ synchronized (downloads) {
+ downloads.put(fileReference, fileReferenceDownload);
+ downloadStatus.put(fileReference, 0.0);
+ }
+ long end = System.currentTimeMillis() + timeout.toMillis();
boolean downloadStarted = false;
- do {
- if (startDownloadRpc(fileReference))
- downloadStarted = true;
- else
- Thread.sleep(100);
- } while (!downloadStarted && ++numAttempts <= 10); // TODO: How long/many times to retry?
-
- if (downloadStarted) {
- return fileReferenceDownload.future().get(timeout.toMillis(), TimeUnit.MILLISECONDS);
- } else {
+ while ((System.currentTimeMillis() < end) && !downloadStarted) {
+ try {
+ if (startDownloadRpc(fileReference)) {
+ downloadStarted = true;
+ } else {
+ Thread.sleep(10);
+ }
+ }
+ catch (InterruptedException | ExecutionException e) {}
+ }
+
+ if ( !downloadStarted) {
fileReferenceDownload.future().setException(new RuntimeException("Failed getting file reference '" + fileReference.value() + "'"));
- downloads.remove(fileReference);
- return Optional.empty();
+ synchronized (downloads) {
+ downloads.remove(fileReference);
+ }
}
}
- synchronized void addToDownloadQueue(FileReferenceDownload fileReferenceDownload) {
- downloadQueue.add(fileReferenceDownload);
+ void addToDownloadQueue(FileReferenceDownload fileReferenceDownload) {
+ log.log(LogLevel.DEBUG, "Will download file reference '" + fileReferenceDownload.fileReference().value() + "'");
+ downloadExecutor.submit(() -> startDownload(fileReferenceDownload.fileReference(), downloadTimeout, fileReferenceDownload));
}
void receiveFile(FileReferenceData fileReferenceData) {
fileReceiver.receiveFile(fileReferenceData);
}
- synchronized Set<FileReference> queuedDownloads() {
- return downloadQueue.stream()
- .map(FileReferenceDownload::fileReference)
- .collect(Collectors.toCollection(LinkedHashSet::new));
- }
-
- private void readFromQueue() {
- do {
- FileReferenceDownload fileReferenceDownload = downloadQueue.poll();
- if (fileReferenceDownload == null) {
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) { /* ignore for now */}
+ void completedDownloading(FileReference fileReference, File file) {
+ synchronized (downloads) {
+ FileReferenceDownload download = downloads.get(fileReference);
+ if (download != null) {
+ downloadStatus.put(fileReference, 1.0);
+ downloads.remove(fileReference);
+ download.future().set(Optional.of(file));
} else {
- log.log(LogLevel.DEBUG, "Will download file reference '" + fileReferenceDownload.fileReference().value() + "'");
- downloadExecutor.submit(() -> startDownload(fileReferenceDownload.fileReference(), downloadTimeout, fileReferenceDownload));
+ log.warning("Received a file " + fileReference + " I did not ask for. Impossible");
}
- } while (true);
- }
-
- void completedDownloading(FileReference fileReference, File file) {
- if (downloads.containsKey(fileReference))
- downloads.get(fileReference).future().set(Optional.of(file));
- downloadStatus.put(fileReference, 100.0);
+ }
}
private boolean startDownloadRpc(FileReference fileReference) throws ExecutionException, InterruptedException {
@@ -146,14 +134,22 @@ public class FileReferenceDownloader {
}
}
- synchronized boolean isDownloading(FileReference fileReference) {
- return downloads.containsKey(fileReference);
+ boolean isDownloading(FileReference fileReference) {
+ synchronized (downloads) {
+ return downloads.containsKey(fileReference);
+ }
}
- synchronized ListenableFuture<Optional<File>> addDownloadListener(FileReference fileReference, Runnable runnable) {
- FileReferenceDownload fileReferenceDownload = downloads.get(fileReference);
- fileReferenceDownload.future().addListener(runnable, downloadExecutor);
- return fileReferenceDownload.future();
+ ListenableFuture<Optional<File>> addDownloadListener(FileReference fileReference, Runnable runnable) {
+ synchronized (downloads) {
+ FileReferenceDownload download = downloads.get(fileReference);
+ if (download != null) {
+ download.future().addListener(runnable, downloadExecutor);
+ return download.future();
+ }
+ }
+ return null;
+
}
private void execute(Request request, Connection connection) {
@@ -173,15 +169,26 @@ public class FileReferenceDownloader {
}
double downloadStatus(String file) {
- return downloadStatus.getOrDefault(new FileReference(file), 0.0);
+ double status = 0.0;
+ synchronized (downloads) {
+ Double download = downloadStatus.get(new FileReference(file));
+ if (download != null) {
+ status = download;
+ }
+ }
+ return status;
}
- void setDownloadStatus(String file, double percentageDownloaded) {
- downloadStatus.put(new FileReference(file), percentageDownloaded);
+ void setDownloadStatus(String file, double completeness) {
+ synchronized (downloads) {
+ downloadStatus.put(new FileReference(file), completeness);
+ }
}
Map<FileReference, Double> downloadStatus() {
- return ImmutableMap.copyOf(downloadStatus);
+ synchronized (downloads) {
+ return ImmutableMap.copyOf(downloadStatus);
+ }
}
public ConnectionPool connectionPool() {