diff options
5 files changed, 106 insertions, 41 deletions
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java index e877d7f1d0c..01f2c161247 100644 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java @@ -301,7 +301,7 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer List<FileReference> fileReferences = Stream.of(fileReferenceStrings) .map(FileReference::new) .collect(Collectors.toList()); - proxyServer.fileDownloader().download(fileReferences); + proxyServer.fileDownloader().queueForDownload(fileReferences); req.returnValues().add(new Int32Value(0)); } diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloader.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloader.java index 3977bbd5bfb..b9b2aa95988 100644 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloader.java +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloader.java @@ -1,7 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.config.proxy.filedistribution; -import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import com.yahoo.config.FileReference; @@ -17,9 +16,8 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.logging.Logger; @@ -35,9 +33,7 @@ public class FileDownloader { private final File downloadDirectory; private final Duration timeout; - private final FileReferenceDownloader fileReferenceDownloader; - private final ExecutorService service = Executors.newFixedThreadPool(10); public FileDownloader(ConnectionPool connectionPool) { this(connectionPool, @@ -48,7 +44,7 @@ public class FileDownloader { FileDownloader(ConnectionPool connectionPool, File downloadDirectory, Duration timeout) { this.downloadDirectory = downloadDirectory; this.timeout = timeout; - this.fileReferenceDownloader = new FileReferenceDownloader(downloadDirectory, connectionPool); + this.fileReferenceDownloader = new FileReferenceDownloader(downloadDirectory, connectionPool, timeout); } public Optional<File> getFile(FileReference fileReference) { @@ -62,12 +58,12 @@ public class FileDownloader { } else { log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' not found in " + directory.getAbsolutePath() + ", starting download"); - return download(fileReference, timeout); + return queueForDownload(fileReference, timeout); } } - public void download(List<FileReference> fileReferences) { - fileReferences.forEach(fileReference -> download(fileReference, timeout)); + public void queueForDownload(List<FileReference> fileReferences) { + fileReferences.forEach(this::queueForDownload); } public void receiveFile(FileReference fileReference, String filename, byte[] content) { @@ -108,7 +104,7 @@ public class FileDownloader { return Optional.empty(); } - private synchronized Optional<File> download(FileReference fileReference, Duration timeout) { + private synchronized Optional<File> queueForDownload(FileReference fileReference, Duration timeout) { if (fileReferenceDownloader.isDownloading(fileReference)) { log.log(LogLevel.INFO, "Already downloading '" + fileReference.value() + "'"); ListenableFuture<Optional<File>> future = @@ -121,15 +117,15 @@ public class FileDownloader { } } - SettableFuture<Optional<File>> file = SettableFuture.create(); - service.submit(() -> fileReferenceDownloader.startDownload(fileReference, timeout, file)); - log.log(LogLevel.INFO, "Started download of '" + fileReference.value() + "' with timeout " + timeout); + SettableFuture<Optional<File>> future = SettableFuture.create(); + queueForDownload(new FileReferenceDownload(fileReference, future)); + log.log(LogLevel.INFO, "Queued '" + fileReference.value() + "' for download with timeout " + timeout); try { Optional<File> fileDownloaded; try { log.log(LogLevel.INFO, "Waiting for '" + fileReference.value() + "' to download"); - fileDownloaded = file.get(timeout.getSeconds() - 1, TimeUnit.SECONDS); + fileDownloaded = future.get(timeout.getSeconds() - 1, TimeUnit.SECONDS); log.log(LogLevel.INFO, "'" + fileReference.value() + "' downloaded"); } catch (TimeoutException e) { log.log(LogLevel.WARNING, "Downloading '" + fileReference.value() + "' timed out"); @@ -141,8 +137,17 @@ public class FileDownloader { } } - ImmutableSet<FileReference> queuedForDownload() { - return ImmutableSet.copyOf(fileReferenceDownloader.queuedForDownload()); + // We don't care about the future in this call + private synchronized void queueForDownload(FileReference fileReference) { + queueForDownload(new FileReferenceDownload(fileReference, SettableFuture.create())); + } + + private synchronized void queueForDownload(FileReferenceDownload fileReferenceDownload) { + fileReferenceDownloader.addToDownloadQueue(fileReferenceDownload); + } + + Set<FileReference> queuedDownloads() { + return fileReferenceDownloader.queuedForDownload(); } } diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownload.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownload.java new file mode 100644 index 00000000000..ce5a30dc7ad --- /dev/null +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownload.java @@ -0,0 +1,28 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +package com.yahoo.vespa.config.proxy.filedistribution; + +import com.google.common.util.concurrent.SettableFuture; +import com.yahoo.config.FileReference; + +import java.io.File; +import java.util.Optional; + +public class FileReferenceDownload { + private final FileReference fileReference; + private final SettableFuture<Optional<File>> future; + + FileReferenceDownload(FileReference fileReference, SettableFuture<Optional<File>> future) { + this.fileReference = fileReference; + this.future = future; + } + + FileReference fileReference() { + return fileReference; + } + + SettableFuture<Optional<File>> future() { + return future; + } + +} diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java index 775a50cfb37..af5e22ec1bf 100644 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java @@ -2,11 +2,8 @@ package com.yahoo.vespa.config.proxy.filedistribution; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.SettableFuture; +import com.yahoo.concurrent.DaemonThreadFactory; import com.yahoo.config.FileReference; import com.yahoo.jrt.Request; import com.yahoo.jrt.StringValue; @@ -20,53 +17,70 @@ import java.nio.file.Files; import java.time.Duration; import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.Map; import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.logging.Logger; +import java.util.stream.Collectors; /** * Downloads file reference using rpc requests to config server and keeps track of files being downloaded * <p> - * Some methods are synchronized to make sure access to queuedForDownload is atomic + * Some methods are synchronized to make sure access to downloads is atomic * * @author hmusum */ // TODO: Add retries when a config server does not have a file reference +// TODO: Handle shutdown of executors class FileReferenceDownloader { private final static Logger log = Logger.getLogger(FileReferenceDownloader.class.getName()); private final static Duration rpcTimeout = Duration.ofSeconds(10); private final File downloadDirectory; - private final ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10)); + private final ExecutorService downloadExecutor = + Executors.newFixedThreadPool(10, new DaemonThreadFactory("filereference downloader")); + private ExecutorService readFromQueueExecutor = + Executors.newFixedThreadPool(1, new DaemonThreadFactory("filereference download queue")); private final ConnectionPool connectionPool; - private final Map<FileReference, SettableFuture<Optional<File>>> queuedForDownload = new LinkedHashMap<>(); + private final ConcurrentLinkedQueue<FileReferenceDownload> downloadQueue = new ConcurrentLinkedQueue<>(); + private final Map<FileReference, FileReferenceDownload> downloads = new LinkedHashMap<>(); private final Map<FileReference, Double> downloadStatus = new HashMap<>(); + private final Duration downloadTimeout; - FileReferenceDownloader(File downloadDirectory, ConnectionPool connectionPool) { + FileReferenceDownloader(File downloadDirectory, ConnectionPool connectionPool, Duration timeout) { this.downloadDirectory = downloadDirectory; this.connectionPool = connectionPool; + this.downloadTimeout = timeout; + readFromQueueExecutor.submit(this::readFromQueue); } - synchronized Optional<File> startDownload(FileReference fileReference, - Duration timeout, - SettableFuture<Optional<File>> file) + private synchronized Optional<File> startDownload(FileReference fileReference, + Duration timeout, + FileReferenceDownload fileReferenceDownload) throws ExecutionException, InterruptedException, TimeoutException { - queuedForDownload.put(fileReference, file); + downloads.put(fileReference, fileReferenceDownload); setDownloadStatus(fileReference.value(), 0.0); if (startDownloadRpc(fileReference)) - return file.get(timeout.toMillis(), TimeUnit.MILLISECONDS); + return fileReferenceDownload.future().get(timeout.toMillis(), TimeUnit.MILLISECONDS); else { - file.setException(new RuntimeException("Failed getting file")); - queuedForDownload.remove(fileReference); + fileReferenceDownload.future().setException(new RuntimeException("Failed getting file")); + downloads.remove(fileReference); return Optional.empty(); } } + synchronized void addToDownloadQueue(FileReferenceDownload fileReferenceDownload) { + downloadQueue.add(fileReferenceDownload); + } + void receiveFile(FileReference fileReference, String filename, byte[] content) { File fileReferenceDir = new File(downloadDirectory, fileReference.value()); try { @@ -81,12 +95,29 @@ class FileReferenceDownloader { } } - synchronized ImmutableSet<FileReference> queuedForDownload() { - return ImmutableSet.copyOf(queuedForDownload.keySet()); + synchronized Set<FileReference> queuedForDownload() { + return downloadQueue.stream() + .map(FileReferenceDownload::fileReference) + .collect(Collectors.toCollection(LinkedHashSet::new)); + } + + private void readFromQueue() { + do { + FileReferenceDownload fileReferenceDownload = downloadQueue.poll(); + if (fileReferenceDownload == null) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { /* ignore for now */} + } else { + log.log(LogLevel.INFO, "Polling queue, found file reference '" + + fileReferenceDownload.fileReference().value() + "' to download"); + downloadExecutor.submit(() -> startDownload(fileReferenceDownload.fileReference(), downloadTimeout, fileReferenceDownload)); + } + } while (true); } private synchronized void completedDownloading(FileReference fileReference, File file) { - queuedForDownload.get(fileReference).set(Optional.of(file)); + downloads.get(fileReference).future().set(Optional.of(file)); downloadStatus.put(fileReference, 100.0); } @@ -112,13 +143,13 @@ class FileReferenceDownloader { } synchronized boolean isDownloading(FileReference fileReference) { - return queuedForDownload.containsKey(fileReference); + return downloads.containsKey(fileReference); } synchronized ListenableFuture<Optional<File>> addDownloadListener(FileReference fileReference, Runnable runnable) { - SettableFuture<Optional<File>> future = queuedForDownload.get(fileReference); - future.addListener(runnable, service); - return future; + FileReferenceDownload fileReferenceDownload = downloads.get(fileReference); + fileReferenceDownload.future().addListener(runnable, downloadExecutor); + return fileReferenceDownload.future(); } private void execute(Request request, Connection connection) { diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java index 64ae1a07aea..dd7923acd53 100644 --- a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java +++ b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java @@ -16,6 +16,7 @@ import java.io.IOException; import java.nio.file.Files; import java.time.Duration; import java.util.Arrays; +import java.util.LinkedHashSet; import java.util.List; import java.util.Optional; @@ -111,9 +112,9 @@ public class FileDownloaderTest { FileReference foo = new FileReference("foo"); FileReference bar = new FileReference("bar"); List<FileReference> fileReferences = Arrays.asList(foo, bar); - fileDownloader.download(fileReferences); + fileDownloader.queueForDownload(fileReferences); - assertEquals(fileReferences, fileDownloader.queuedForDownload().asList()); + assertEquals(new LinkedHashSet<>(fileReferences), fileDownloader.queuedDownloads()); // Verify download status assertDownloadStatus(fileDownloader, foo, 0.0); |