diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2017-12-07 15:45:00 +0100 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2017-12-07 15:53:29 +0100 |
commit | b9b0b7d153cde0f6d023d849c8ffdcefb533a277 (patch) | |
tree | d7e029c0fe9d4f7b9fd2f3dbd0dacd4a6f0cc474 /filedistribution | |
parent | 135ef16d40619bc7fe186b1cdfe825a39b22492d (diff) |
Detach and serve from separate thread.
Diffstat (limited to 'filedistribution')
3 files changed, 92 insertions, 98 deletions
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java index 9fe5eec54ff..ca8b8c27ad3 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java @@ -1,13 +1,11 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.filedistribution; -import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import com.yahoo.config.FileReference; import com.yahoo.log.LogLevel; import com.yahoo.vespa.config.ConnectionPool; import com.yahoo.vespa.defaults.Defaults; -import com.yahoo.yolean.Exceptions; import java.io.File; import java.time.Duration; @@ -17,6 +15,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.logging.Logger; @@ -47,17 +46,28 @@ public class FileDownloader { } public Optional<File> getFile(FileReference fileReference) { + try { + return getFutureFile(fileReference).get(timeout.toMillis(), TimeUnit.MILLISECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + return Optional.empty(); + } + } + + public Future<Optional<File>> getFutureFile(FileReference fileReference) { Objects.requireNonNull(fileReference, "file reference cannot be null"); File directory = new File(downloadDirectory, fileReference.value()); log.log(LogLevel.DEBUG, "Checking if there is a file in '" + directory.getAbsolutePath() + "' "); Optional<File> file = getFileFromFileSystem(fileReference, directory); if (file.isPresent()) { - return file; + SettableFuture<Optional<File>> future = SettableFuture.create(); + future.set(file); + return future; } else { log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' not found in " + directory.getAbsolutePath() + ", starting download"); return queueForDownload(fileReference, timeout); + } } @@ -86,11 +96,9 @@ public class FileDownloader { if (directory.exists() && directory.isDirectory() && files != null && files.length > 0) { File file = files[0]; if (!file.exists()) { - throw new RuntimeException("File with reference '" + fileReference.value() + - "' does not exist"); + throw new RuntimeException("File with reference '" + fileReference.value() + "' does not exist"); } else if (!file.canRead()) { - throw new RuntimeException("File with reference '" + fileReference.value() + - "'exists, but unable to read it"); + throw new RuntimeException("File with reference '" + fileReference.value() + "'exists, but unable to read it"); } else { fileReferenceDownloader.setDownloadStatus(fileReference.value(), 100.0); return Optional.of(file); @@ -99,46 +107,26 @@ public class FileDownloader { return Optional.empty(); } - private synchronized Optional<File> queueForDownload(FileReference fileReference, Duration timeout) { - if (fileReferenceDownloader.isDownloading(fileReference)) { + private synchronized Future<Optional<File>> queueForDownload(FileReference fileReference, Duration timeout) { + Future<Optional<File>> inProgress = fileReferenceDownloader.addDownloadListener(fileReference, () -> getFile(fileReference)); + if (inProgress != null) { log.log(LogLevel.INFO, "Already downloading '" + fileReference.value() + "'"); - ListenableFuture<Optional<File>> future = - fileReferenceDownloader.addDownloadListener(fileReference, () -> getFile(fileReference)); - try { - return future.get(); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException("Failed downloading file reference '" + fileReference.value() + "': " + - Exceptions.toMessageString(e)); - } + return inProgress; } - SettableFuture<Optional<File>> future = SettableFuture.create(); - queueForDownload(new FileReferenceDownload(fileReference, future)); + Future<Optional<File>> future = queueForDownload(fileReference); log.log(LogLevel.INFO, "Queued '" + fileReference.value() + "' for download with timeout " + timeout); - - try { - Optional<File> fileDownloaded; - try { - log.log(LogLevel.INFO, "Waiting for '" + fileReference.value() + "' to download"); - fileDownloaded = future.get(timeout.getSeconds() - 1, TimeUnit.SECONDS); - log.log(LogLevel.INFO, "'" + fileReference.value() + "' downloaded"); - } catch (TimeoutException e) { - log.log(LogLevel.WARNING, "Downloading '" + fileReference.value() + "' timed out"); - return Optional.empty(); - } - return fileDownloaded; - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException("Could not download '" + fileReference.value() + "'"); - } + return future; } // We don't care about the future in this call - private synchronized void queueForDownload(FileReference fileReference) { - queueForDownload(new FileReferenceDownload(fileReference, SettableFuture.create())); + private Future<Optional<File>> queueForDownload(FileReference fileReference) { + return queueForDownload(new FileReferenceDownload(fileReference, SettableFuture.create())); } - private synchronized void queueForDownload(FileReferenceDownload fileReferenceDownload) { + private Future<Optional<File>> queueForDownload(FileReferenceDownload fileReferenceDownload) { fileReferenceDownloader.addToDownloadQueue(fileReferenceDownload); + return fileReferenceDownload.future(); } public FileReferenceDownloader fileReferenceDownloader() { diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java index fb511411128..9de4c1fcd5b 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java @@ -2,13 +2,14 @@ package com.yahoo.vespa.filedistribution; +import com.google.common.util.concurrent.AtomicDouble; import com.google.common.util.concurrent.SettableFuture; import com.yahoo.config.FileReference; import java.io.File; import java.util.Optional; -public class FileReferenceDownload { +class FileReferenceDownload { private final FileReference fileReference; private final SettableFuture<Optional<File>> future; @@ -20,9 +21,7 @@ public class FileReferenceDownload { FileReference fileReference() { return fileReference; } - SettableFuture<Optional<File>> future() { return future; } - } diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java index b51a4b68212..3a9af23e6d8 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java @@ -20,6 +20,7 @@ import java.util.LinkedHashSet; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -44,10 +45,7 @@ public class FileReferenceDownloader { private final ExecutorService downloadExecutor = Executors.newFixedThreadPool(10, new DaemonThreadFactory("filereference downloader")); - private ExecutorService readFromQueueExecutor = - Executors.newFixedThreadPool(1, new DaemonThreadFactory("filereference download queue")); private final ConnectionPool connectionPool; - private final ConcurrentLinkedQueue<FileReferenceDownload> downloadQueue = new ConcurrentLinkedQueue<>(); private final Map<FileReference, FileReferenceDownload> downloads = new LinkedHashMap<>(); private final Map<FileReference, Double> downloadStatus = new HashMap<>(); private final Duration downloadTimeout; @@ -56,67 +54,57 @@ public class FileReferenceDownloader { FileReferenceDownloader(File downloadDirectory, ConnectionPool connectionPool, Duration timeout) { this.connectionPool = connectionPool; this.downloadTimeout = timeout; - readFromQueueExecutor.submit(this::readFromQueue); this.fileReceiver = new FileReceiver(connectionPool.getSupervisor(), this, downloadDirectory); } - private synchronized Optional<File> startDownload(FileReference fileReference, - Duration timeout, - FileReferenceDownload fileReferenceDownload) - throws ExecutionException, InterruptedException, TimeoutException { - downloads.put(fileReference, fileReferenceDownload); - setDownloadStatus(fileReference.value(), 0.0); - - int numAttempts = 0; + private void startDownload(FileReference fileReference, Duration timeout, + FileReferenceDownload fileReferenceDownload) + { + synchronized (downloads) { + downloads.put(fileReference, fileReferenceDownload); + downloadStatus.put(fileReference, 0.0); + } + long end = System.currentTimeMillis() + timeout.toMillis(); boolean downloadStarted = false; - do { - if (startDownloadRpc(fileReference)) - downloadStarted = true; - else - Thread.sleep(100); - } while (!downloadStarted && ++numAttempts <= 10); // TODO: How long/many times to retry? - - if (downloadStarted) { - return fileReferenceDownload.future().get(timeout.toMillis(), TimeUnit.MILLISECONDS); - } else { + while ((System.currentTimeMillis() < end) && !downloadStarted) { + try { + if (startDownloadRpc(fileReference)) { + downloadStarted = true; + } else { + Thread.sleep(10); + } + } + catch (InterruptedException | ExecutionException e) {} + } + + if ( !downloadStarted) { fileReferenceDownload.future().setException(new RuntimeException("Failed getting file reference '" + fileReference.value() + "'")); - downloads.remove(fileReference); - return Optional.empty(); + synchronized (downloads) { + downloads.remove(fileReference); + } } } - synchronized void addToDownloadQueue(FileReferenceDownload fileReferenceDownload) { - downloadQueue.add(fileReferenceDownload); + void addToDownloadQueue(FileReferenceDownload fileReferenceDownload) { + log.log(LogLevel.DEBUG, "Will download file reference '" + fileReferenceDownload.fileReference().value() + "'"); + downloadExecutor.submit(() -> startDownload(fileReferenceDownload.fileReference(), downloadTimeout, fileReferenceDownload)); } void receiveFile(FileReferenceData fileReferenceData) { fileReceiver.receiveFile(fileReferenceData); } - synchronized Set<FileReference> queuedDownloads() { - return downloadQueue.stream() - .map(FileReferenceDownload::fileReference) - .collect(Collectors.toCollection(LinkedHashSet::new)); - } - - private void readFromQueue() { - do { - FileReferenceDownload fileReferenceDownload = downloadQueue.poll(); - if (fileReferenceDownload == null) { - try { - Thread.sleep(10); - } catch (InterruptedException e) { /* ignore for now */} + void completedDownloading(FileReference fileReference, File file) { + synchronized (downloads) { + FileReferenceDownload download = downloads.get(fileReference); + if (download != null) { + downloadStatus.put(fileReference, 1.0); + downloads.remove(fileReference); + download.future().set(Optional.of(file)); } else { - log.log(LogLevel.DEBUG, "Will download file reference '" + fileReferenceDownload.fileReference().value() + "'"); - downloadExecutor.submit(() -> startDownload(fileReferenceDownload.fileReference(), downloadTimeout, fileReferenceDownload)); + log.warning("Received a file " + fileReference + " I did not ask for. Impossible"); } - } while (true); - } - - void completedDownloading(FileReference fileReference, File file) { - if (downloads.containsKey(fileReference)) - downloads.get(fileReference).future().set(Optional.of(file)); - downloadStatus.put(fileReference, 100.0); + } } private boolean startDownloadRpc(FileReference fileReference) throws ExecutionException, InterruptedException { @@ -146,14 +134,22 @@ public class FileReferenceDownloader { } } - synchronized boolean isDownloading(FileReference fileReference) { - return downloads.containsKey(fileReference); + boolean isDownloading(FileReference fileReference) { + synchronized (downloads) { + return downloads.containsKey(fileReference); + } } - synchronized ListenableFuture<Optional<File>> addDownloadListener(FileReference fileReference, Runnable runnable) { - FileReferenceDownload fileReferenceDownload = downloads.get(fileReference); - fileReferenceDownload.future().addListener(runnable, downloadExecutor); - return fileReferenceDownload.future(); + ListenableFuture<Optional<File>> addDownloadListener(FileReference fileReference, Runnable runnable) { + synchronized (downloads) { + FileReferenceDownload download = downloads.get(fileReference); + if (download != null) { + download.future().addListener(runnable, downloadExecutor); + return download.future(); + } + } + return null; + } private void execute(Request request, Connection connection) { @@ -173,15 +169,26 @@ public class FileReferenceDownloader { } double downloadStatus(String file) { - return downloadStatus.getOrDefault(new FileReference(file), 0.0); + double status = 0.0; + synchronized (downloads) { + Double download = downloadStatus.get(new FileReference(file)); + if (download != null) { + status = download; + } + } + return status; } - void setDownloadStatus(String file, double percentageDownloaded) { - downloadStatus.put(new FileReference(file), percentageDownloaded); + void setDownloadStatus(String file, double completeness) { + synchronized (downloads) { + downloadStatus.put(new FileReference(file), completeness); + } } Map<FileReference, Double> downloadStatus() { - return ImmutableMap.copyOf(downloadStatus); + synchronized (downloads) { + return ImmutableMap.copyOf(downloadStatus); + } } public ConnectionPool connectionPool() { |