summaryrefslogtreecommitdiffstats
path: root/config-proxy
diff options
context:
space:
mode:
authorHarald Musum <musum@oath.com>2017-11-07 15:15:45 +0100
committerHarald Musum <musum@oath.com>2017-11-07 15:15:45 +0100
commitb3c80e89982fdf07115b276b737ec9f535c231a9 (patch)
tree072d56d08b6cb64ed504c5d7636729ae30adc4a4 /config-proxy
parente1a47d5e9a82251ec76728943e7b5559d0b96e7c (diff)
Add a queue for downloads
Diffstat (limited to 'config-proxy')
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java2
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloader.java37
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownload.java28
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java75
-rw-r--r--config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java5
5 files changed, 106 insertions, 41 deletions
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java
index e877d7f1d0c..01f2c161247 100644
--- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java
@@ -301,7 +301,7 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer
List<FileReference> fileReferences = Stream.of(fileReferenceStrings)
.map(FileReference::new)
.collect(Collectors.toList());
- proxyServer.fileDownloader().download(fileReferences);
+ proxyServer.fileDownloader().queueForDownload(fileReferences);
req.returnValues().add(new Int32Value(0));
}
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloader.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloader.java
index 3977bbd5bfb..b9b2aa95988 100644
--- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloader.java
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloader.java
@@ -1,7 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.config.proxy.filedistribution;
-import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.yahoo.config.FileReference;
@@ -17,9 +16,8 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Logger;
@@ -35,9 +33,7 @@ public class FileDownloader {
private final File downloadDirectory;
private final Duration timeout;
-
private final FileReferenceDownloader fileReferenceDownloader;
- private final ExecutorService service = Executors.newFixedThreadPool(10);
public FileDownloader(ConnectionPool connectionPool) {
this(connectionPool,
@@ -48,7 +44,7 @@ public class FileDownloader {
FileDownloader(ConnectionPool connectionPool, File downloadDirectory, Duration timeout) {
this.downloadDirectory = downloadDirectory;
this.timeout = timeout;
- this.fileReferenceDownloader = new FileReferenceDownloader(downloadDirectory, connectionPool);
+ this.fileReferenceDownloader = new FileReferenceDownloader(downloadDirectory, connectionPool, timeout);
}
public Optional<File> getFile(FileReference fileReference) {
@@ -62,12 +58,12 @@ public class FileDownloader {
} else {
log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' not found in " +
directory.getAbsolutePath() + ", starting download");
- return download(fileReference, timeout);
+ return queueForDownload(fileReference, timeout);
}
}
- public void download(List<FileReference> fileReferences) {
- fileReferences.forEach(fileReference -> download(fileReference, timeout));
+ public void queueForDownload(List<FileReference> fileReferences) {
+ fileReferences.forEach(this::queueForDownload);
}
public void receiveFile(FileReference fileReference, String filename, byte[] content) {
@@ -108,7 +104,7 @@ public class FileDownloader {
return Optional.empty();
}
- private synchronized Optional<File> download(FileReference fileReference, Duration timeout) {
+ private synchronized Optional<File> queueForDownload(FileReference fileReference, Duration timeout) {
if (fileReferenceDownloader.isDownloading(fileReference)) {
log.log(LogLevel.INFO, "Already downloading '" + fileReference.value() + "'");
ListenableFuture<Optional<File>> future =
@@ -121,15 +117,15 @@ public class FileDownloader {
}
}
- SettableFuture<Optional<File>> file = SettableFuture.create();
- service.submit(() -> fileReferenceDownloader.startDownload(fileReference, timeout, file));
- log.log(LogLevel.INFO, "Started download of '" + fileReference.value() + "' with timeout " + timeout);
+ SettableFuture<Optional<File>> future = SettableFuture.create();
+ queueForDownload(new FileReferenceDownload(fileReference, future));
+ log.log(LogLevel.INFO, "Queued '" + fileReference.value() + "' for download with timeout " + timeout);
try {
Optional<File> fileDownloaded;
try {
log.log(LogLevel.INFO, "Waiting for '" + fileReference.value() + "' to download");
- fileDownloaded = file.get(timeout.getSeconds() - 1, TimeUnit.SECONDS);
+ fileDownloaded = future.get(timeout.getSeconds() - 1, TimeUnit.SECONDS);
log.log(LogLevel.INFO, "'" + fileReference.value() + "' downloaded");
} catch (TimeoutException e) {
log.log(LogLevel.WARNING, "Downloading '" + fileReference.value() + "' timed out");
@@ -141,8 +137,17 @@ public class FileDownloader {
}
}
- ImmutableSet<FileReference> queuedForDownload() {
- return ImmutableSet.copyOf(fileReferenceDownloader.queuedForDownload());
+ // We don't care about the future in this call
+ private synchronized void queueForDownload(FileReference fileReference) {
+ queueForDownload(new FileReferenceDownload(fileReference, SettableFuture.create()));
+ }
+
+ private synchronized void queueForDownload(FileReferenceDownload fileReferenceDownload) {
+ fileReferenceDownloader.addToDownloadQueue(fileReferenceDownload);
+ }
+
+ Set<FileReference> queuedDownloads() {
+ return fileReferenceDownloader.queuedForDownload();
}
}
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownload.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownload.java
new file mode 100644
index 00000000000..ce5a30dc7ad
--- /dev/null
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownload.java
@@ -0,0 +1,28 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+package com.yahoo.vespa.config.proxy.filedistribution;
+
+import com.google.common.util.concurrent.SettableFuture;
+import com.yahoo.config.FileReference;
+
+import java.io.File;
+import java.util.Optional;
+
+public class FileReferenceDownload {
+ private final FileReference fileReference;
+ private final SettableFuture<Optional<File>> future;
+
+ FileReferenceDownload(FileReference fileReference, SettableFuture<Optional<File>> future) {
+ this.fileReference = fileReference;
+ this.future = future;
+ }
+
+ FileReference fileReference() {
+ return fileReference;
+ }
+
+ SettableFuture<Optional<File>> future() {
+ return future;
+ }
+
+}
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java
index 775a50cfb37..af5e22ec1bf 100644
--- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java
@@ -2,11 +2,8 @@
package com.yahoo.vespa.config.proxy.filedistribution;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.SettableFuture;
+import com.yahoo.concurrent.DaemonThreadFactory;
import com.yahoo.config.FileReference;
import com.yahoo.jrt.Request;
import com.yahoo.jrt.StringValue;
@@ -20,53 +17,70 @@ import java.nio.file.Files;
import java.time.Duration;
import java.util.HashMap;
import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Logger;
+import java.util.stream.Collectors;
/**
* Downloads file reference using rpc requests to config server and keeps track of files being downloaded
* <p>
- * Some methods are synchronized to make sure access to queuedForDownload is atomic
+ * Some methods are synchronized to make sure access to downloads is atomic
*
* @author hmusum
*/
// TODO: Add retries when a config server does not have a file reference
+// TODO: Handle shutdown of executors
class FileReferenceDownloader {
private final static Logger log = Logger.getLogger(FileReferenceDownloader.class.getName());
private final static Duration rpcTimeout = Duration.ofSeconds(10);
private final File downloadDirectory;
- private final ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
+ private final ExecutorService downloadExecutor =
+ Executors.newFixedThreadPool(10, new DaemonThreadFactory("filereference downloader"));
+ private ExecutorService readFromQueueExecutor =
+ Executors.newFixedThreadPool(1, new DaemonThreadFactory("filereference download queue"));
private final ConnectionPool connectionPool;
- private final Map<FileReference, SettableFuture<Optional<File>>> queuedForDownload = new LinkedHashMap<>();
+ private final ConcurrentLinkedQueue<FileReferenceDownload> downloadQueue = new ConcurrentLinkedQueue<>();
+ private final Map<FileReference, FileReferenceDownload> downloads = new LinkedHashMap<>();
private final Map<FileReference, Double> downloadStatus = new HashMap<>();
+ private final Duration downloadTimeout;
- FileReferenceDownloader(File downloadDirectory, ConnectionPool connectionPool) {
+ FileReferenceDownloader(File downloadDirectory, ConnectionPool connectionPool, Duration timeout) {
this.downloadDirectory = downloadDirectory;
this.connectionPool = connectionPool;
+ this.downloadTimeout = timeout;
+ readFromQueueExecutor.submit(this::readFromQueue);
}
- synchronized Optional<File> startDownload(FileReference fileReference,
- Duration timeout,
- SettableFuture<Optional<File>> file)
+ private synchronized Optional<File> startDownload(FileReference fileReference,
+ Duration timeout,
+ FileReferenceDownload fileReferenceDownload)
throws ExecutionException, InterruptedException, TimeoutException {
- queuedForDownload.put(fileReference, file);
+ downloads.put(fileReference, fileReferenceDownload);
setDownloadStatus(fileReference.value(), 0.0);
if (startDownloadRpc(fileReference))
- return file.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
+ return fileReferenceDownload.future().get(timeout.toMillis(), TimeUnit.MILLISECONDS);
else {
- file.setException(new RuntimeException("Failed getting file"));
- queuedForDownload.remove(fileReference);
+ fileReferenceDownload.future().setException(new RuntimeException("Failed getting file"));
+ downloads.remove(fileReference);
return Optional.empty();
}
}
+ synchronized void addToDownloadQueue(FileReferenceDownload fileReferenceDownload) {
+ downloadQueue.add(fileReferenceDownload);
+ }
+
void receiveFile(FileReference fileReference, String filename, byte[] content) {
File fileReferenceDir = new File(downloadDirectory, fileReference.value());
try {
@@ -81,12 +95,29 @@ class FileReferenceDownloader {
}
}
- synchronized ImmutableSet<FileReference> queuedForDownload() {
- return ImmutableSet.copyOf(queuedForDownload.keySet());
+ synchronized Set<FileReference> queuedForDownload() {
+ return downloadQueue.stream()
+ .map(FileReferenceDownload::fileReference)
+ .collect(Collectors.toCollection(LinkedHashSet::new));
+ }
+
+ private void readFromQueue() {
+ do {
+ FileReferenceDownload fileReferenceDownload = downloadQueue.poll();
+ if (fileReferenceDownload == null) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) { /* ignore for now */}
+ } else {
+ log.log(LogLevel.INFO, "Polling queue, found file reference '" +
+ fileReferenceDownload.fileReference().value() + "' to download");
+ downloadExecutor.submit(() -> startDownload(fileReferenceDownload.fileReference(), downloadTimeout, fileReferenceDownload));
+ }
+ } while (true);
}
private synchronized void completedDownloading(FileReference fileReference, File file) {
- queuedForDownload.get(fileReference).set(Optional.of(file));
+ downloads.get(fileReference).future().set(Optional.of(file));
downloadStatus.put(fileReference, 100.0);
}
@@ -112,13 +143,13 @@ class FileReferenceDownloader {
}
synchronized boolean isDownloading(FileReference fileReference) {
- return queuedForDownload.containsKey(fileReference);
+ return downloads.containsKey(fileReference);
}
synchronized ListenableFuture<Optional<File>> addDownloadListener(FileReference fileReference, Runnable runnable) {
- SettableFuture<Optional<File>> future = queuedForDownload.get(fileReference);
- future.addListener(runnable, service);
- return future;
+ FileReferenceDownload fileReferenceDownload = downloads.get(fileReference);
+ fileReferenceDownload.future().addListener(runnable, downloadExecutor);
+ return fileReferenceDownload.future();
}
private void execute(Request request, Connection connection) {
diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java
index 64ae1a07aea..dd7923acd53 100644
--- a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java
+++ b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java
@@ -16,6 +16,7 @@ import java.io.IOException;
import java.nio.file.Files;
import java.time.Duration;
import java.util.Arrays;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Optional;
@@ -111,9 +112,9 @@ public class FileDownloaderTest {
FileReference foo = new FileReference("foo");
FileReference bar = new FileReference("bar");
List<FileReference> fileReferences = Arrays.asList(foo, bar);
- fileDownloader.download(fileReferences);
+ fileDownloader.queueForDownload(fileReferences);
- assertEquals(fileReferences, fileDownloader.queuedForDownload().asList());
+ assertEquals(new LinkedHashSet<>(fileReferences), fileDownloader.queuedDownloads());
// Verify download status
assertDownloadStatus(fileDownloader, foo, 0.0);