summaryrefslogtreecommitdiffstats
path: root/config-proxy
diff options
context:
space:
mode:
authorHarald Musum <musum@oath.com>2017-11-07 10:53:58 +0100
committerHarald Musum <musum@oath.com>2017-11-07 10:53:58 +0100
commit3844115235cf56c977aa580a3cd56f81ebfcc520 (patch)
tree11607e9a45284e34d9fae3536b5d3c02ef418330 /config-proxy
parent5e9a9227722eec66390b2670359a41b4703b343a (diff)
Implement downloading of file references, not functional yet
Diffstat (limited to 'config-proxy')
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java62
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java2
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloader.java160
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java151
-rw-r--r--config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServerTest.java3
-rw-r--r--config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockConfigSource.java9
-rw-r--r--config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ProxyServerTest.java1
-rw-r--r--config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java219
8 files changed, 498 insertions, 109 deletions
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java
index 1fced0b1e3d..e877d7f1d0c 100644
--- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java
@@ -11,8 +11,12 @@ import com.yahoo.vespa.config.protocol.JRTServerConfigRequest;
import com.yahoo.vespa.config.protocol.JRTServerConfigRequestV3;
import java.io.File;
-import java.lang.*;
-import java.util.*;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -109,12 +113,12 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer
// Legacy method, needs to be the same name as used in filedistributor
supervisor.addMethod(new Method("waitFor", "s", "s",
this, "getFile")
- .methodDesc("wait for file reference")
+ .methodDesc("get path to file reference")
.paramDesc(0, "file reference", "file reference")
.returnDesc(0, "path", "path to file"));
supervisor.addMethod(new Method("filedistribution.getFile", "s", "s",
this, "getFile")
- .methodDesc("wait for file reference")
+ .methodDesc("get path to file reference")
.paramDesc(0, "file reference", "file reference")
.returnDesc(0, "path", "path to file"));
supervisor.addMethod(new Method("filedistribution.getActiveFileReferencesStatus", "", "SD",
@@ -127,6 +131,13 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer
.methodDesc("set which file references to download")
.paramDesc(0, "file references", "file reference to download")
.returnDesc(0, "ret", "0 if success, 1 otherwise"));
+ supervisor.addMethod(new Method("filedistribution.receiveFile", "slx", "i", // TODO Temporary method to get started with testing
+ this, "receiveFile")
+ .methodDesc("receive file reference content")
+ .paramDesc(0, "file references", "file reference to download")
+ .paramDesc(1, "filename", "filename")
+ .paramDesc(2, "content", "array of bytes")
+ .returnDesc(0, "ret", "0 if success, 1 otherwise"));
}
//---------------- RPC methods ------------------------------------
@@ -235,17 +246,33 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer
req.returnValues().add(new StringValue(memoryCache.dumpCacheToDisk(req.parameters().get(0).asString(), memoryCache)));
}
+ // TODO: Duplicate of code in FileAcquirereImpl. Find out where to put it. What about C++ code using this RPC call?
+ private static final int baseErrorCode = 0x10000;
+ private static final int baseFileProviderErrorCode = baseErrorCode + 0x1000;
+
+ private static final int fileReferenceDoesNotExists = baseFileProviderErrorCode;
+ private static final int fileReferenceRemoved = fileReferenceDoesNotExists + 1;
+ private static final int fileReferenceInternalError = fileReferenceRemoved + 1;
+
@SuppressWarnings({"UnusedDeclaration"})
public final void getFile(Request req) {
- // TODO: Detach to avoid holding transport thread
+ req.detach();
FileReference fileReference = new FileReference(req.parameters().get(0).asString());
- String pathToFile = proxyServer.fileDownloader()
- .getFile(fileReference)
- .orElseGet(() -> new File(""))
- .getAbsolutePath();
-
- log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' available at " + pathToFile);
- req.returnValues().add(new StringValue(pathToFile));
+ log.log(LogLevel.DEBUG, "getFile() called for file reference '" + fileReference.value() + "'");
+ Optional<File> pathToFile = proxyServer.fileDownloader().getFile(fileReference);
+ try {
+ if (pathToFile.isPresent()) {
+ req.returnValues().add(new StringValue(pathToFile.get().getAbsolutePath()));
+ log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' available at " + pathToFile.get());
+ } else {
+ log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' not found, returning error");
+ req.setError(fileReferenceDoesNotExists, "File reference '" + fileReference.value() + "' not found");
+ }
+ } catch (Throwable e) {
+ log.log(LogLevel.WARNING, "File reference '" + fileReference.value() + "' got exeption: " + e.getMessage());
+ req.setError(fileReferenceInternalError, "File reference '" + fileReference.value() + "' removed");
+ }
+ req.returnRequest();
}
@SuppressWarnings({"UnusedDeclaration"})
@@ -274,11 +301,20 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer
List<FileReference> fileReferences = Stream.of(fileReferenceStrings)
.map(FileReference::new)
.collect(Collectors.toList());
- proxyServer.fileDownloader().queueForDownload(fileReferences);
+ proxyServer.fileDownloader().download(fileReferences);
req.returnValues().add(new Int32Value(0));
}
+ @SuppressWarnings({"UnusedDeclaration"})
+ public final void receiveFile(Request req) {
+ FileReference fileReference = new FileReference(req.parameters().get(0).asString());
+ String filename = req.parameters().get(1).asString();
+ byte[] content = req.parameters().get(2).asData();
+ proxyServer.fileDownloader().receiveFile(fileReference, filename, content);
+ req.returnValues().add(new Int32Value(0));
+ }
+
//----------------------------------------------------
private boolean isProtocolVersionSupported(JRTServerConfigRequest request) {
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java
index 4ee77beb2d7..5668852311f 100644
--- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java
@@ -83,7 +83,7 @@ public class ProxyServer implements Runnable {
this.rpcServer = createRpcServer(spec);
clientUpdater = new ClientUpdater(rpcServer, statistics, delayedResponses);
this.configClient = createClient(clientUpdater, delayedResponses, source, timingValues, memoryCache, configClient);
- this.fileDownloader = new FileDownloader(source);
+ this.fileDownloader = new FileDownloader(new JRTConnectionPool(source));
}
static ProxyServer createTestServer(ConfigSourceSet source) {
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloader.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloader.java
index 9074527e4e4..21e9b7f160b 100644
--- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloader.java
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloader.java
@@ -2,97 +2,147 @@
package com.yahoo.vespa.config.proxy.filedistribution;
import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import com.yahoo.config.FileReference;
-import com.yahoo.config.subscription.ConfigSourceSet;
import com.yahoo.log.LogLevel;
+import com.yahoo.vespa.config.ConnectionPool;
import com.yahoo.vespa.defaults.Defaults;
+import com.yahoo.yolean.Exceptions;
import java.io.File;
import java.time.Duration;
-import java.time.Instant;
-import java.util.*;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.logging.Logger;
/**
- * Keeps track of files to download and download status
+ * Handles downloads of files (file references only for now)
*
* @author hmusum
*/
public class FileDownloader {
- private final static Logger log = Logger.getLogger(FileDownloader.class.getName());
+ private final static Logger log = Logger.getLogger(FileDownloader.class.getName());
- private final String filesDirectory;
- private final ConfigSourceSet configSourceSet;
+ private final File downloadDirectory;
private final Duration timeout;
- private final Map<FileReference, Double> downloadStatus = new HashMap<>();
- private final Set<FileReference> queuedForDownload = new LinkedHashSet<>();
- public FileDownloader(ConfigSourceSet configSourceSet) {
- this(configSourceSet,
- Defaults.getDefaults().underVespaHome("var/db/vespa/filedistribution"),
+ private final FileReferenceDownloader fileReferenceDownloader;
+ private final ExecutorService service = Executors.newFixedThreadPool(10);
+
+ public FileDownloader(ConnectionPool connectionPool) {
+ this(connectionPool,
+ new File(Defaults.getDefaults().underVespaHome("var/db/vespa/filedistribution")),
Duration.ofMinutes(15));
}
- FileDownloader(ConfigSourceSet configSourceSet, String filesDirectory, Duration timeout) {
- this.configSourceSet = configSourceSet;
- this.filesDirectory = filesDirectory;
+ FileDownloader(ConnectionPool connectionPool, File downloadDirectory, Duration timeout) {
+ this.downloadDirectory = downloadDirectory;
this.timeout = timeout;
+ this.fileReferenceDownloader = new FileReferenceDownloader(downloadDirectory, connectionPool);
}
public Optional<File> getFile(FileReference fileReference) {
Objects.requireNonNull(fileReference, "file reference cannot be null");
- File directory = new File(filesDirectory, fileReference.value()); // directory with one file
-
+ File directory = new File(downloadDirectory, fileReference.value());
log.log(LogLevel.DEBUG, "Checking if there is a file in '" + directory.getAbsolutePath() + "' ");
- Instant end = Instant.now().plus(timeout);
- do {
- File[] files = directory.listFiles();
- if (directory.exists() && directory.isDirectory() && files != null && files.length > 0) {
- if (files.length != 1) {
- throw new RuntimeException("More than one file in '" + fileReference.value() +
- "', expected only one, unable to proceed");
- }
- File file = files[0];
- if (!file.exists()) {
- throw new RuntimeException("File with reference '" + fileReference.value() +
- "' does not exist");
- } else if (!file.canRead()) {
- throw new RuntimeException("File with reference '" + fileReference.value() +
- "'exists, but unable to read it");
- } else {
- downloadStatus.put(fileReference, 100.0);
- return Optional.of(file);
- }
- } else {
- queueForDownload(fileReference);
- }
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- } while (Instant.now().isBefore(end));
- return Optional.empty();
+ Optional<File> file = getFileFromFileSystem(fileReference, directory);
+ if (file.isPresent()) {
+ return file;
+ } else {
+ log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' not found in " +
+ directory.getAbsolutePath() + ", starting download");
+ return download(fileReference, timeout);
+ }
+ }
+
+ public void download(List<FileReference> fileReferences) {
+ fileReferences.forEach(fileReference -> download(fileReference, timeout));
+ }
+
+ public void receiveFile(FileReference fileReference, String filename, byte[] content) {
+ fileReferenceDownloader.receiveFile(fileReference, filename, content);
+ }
+
+ double downloadStatus(FileReference fileReference) {
+ return fileReferenceDownloader.downloadStatus(fileReference.value());
}
public Map<FileReference, Double> downloadStatus() {
- return downloadStatus;
+ return fileReferenceDownloader.downloadStatus();
+ }
+
+ File downloadDirectory() {
+ return downloadDirectory;
}
- public void queueForDownload(List<FileReference> fileReferences) {
- fileReferences.forEach(this::queueForDownload);
+ private Optional<File> getFileFromFileSystem(FileReference fileReference, File directory) {
+ File[] files = directory.listFiles();
+ if (directory.exists() && directory.isDirectory() && files != null && files.length > 0) {
+ if (files.length != 1) {
+ throw new RuntimeException("More than one file in '" + fileReference.value() +
+ "', expected only one, unable to proceed");
+ }
+ File file = files[0];
+ if (!file.exists()) {
+ throw new RuntimeException("File with reference '" + fileReference.value() +
+ "' does not exist");
+ } else if (!file.canRead()) {
+ throw new RuntimeException("File with reference '" + fileReference.value() +
+ "'exists, but unable to read it");
+ } else {
+ fileReferenceDownloader.setDownloadStatus(fileReference.value(), 100.0);
+ return Optional.of(file);
+ }
+ }
+ return Optional.empty();
}
- private void queueForDownload(FileReference fileReference) {
- log.log(LogLevel.INFO, "Queued '" + fileReference.value() + "' for download ");
- queuedForDownload.add(fileReference);
- downloadStatus.put(fileReference, 0.0);
+ private synchronized Optional<File> download(FileReference fileReference, Duration timeout) {
+ if (fileReferenceDownloader.isDownloading(fileReference)) {
+ log.log(LogLevel.INFO, "Already downloading '" + fileReference.value() + "'");
+ ListenableFuture<Optional<File>> future =
+ fileReferenceDownloader.addDownloadListener(fileReference, () -> getFile(fileReference));
+ try {
+ return future.get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException("Failed downloading file reference '" + fileReference.value() + "': " +
+ Exceptions.toMessageString(e));
+ }
+ }
+
+ SettableFuture<Optional<File>> file = SettableFuture.create();
+ service.submit(() -> fileReferenceDownloader.startDownload(fileReference, timeout, file));
+ log.log(LogLevel.INFO, "Started download of '" + fileReference.value() + "' with timeout " + timeout);
+
+ try {
+ Optional<File> fileDownloaded;
+ try {
+ log.log(LogLevel.INFO, "Waiting for '" + fileReference.value() + "' to download");
+ fileDownloaded = file.get(timeout.getSeconds() - 1, TimeUnit.SECONDS);
+ log.log(LogLevel.INFO, "'" + fileReference.value() + "' downloaded");
+ } catch (TimeoutException e) {
+ log.log(LogLevel.WARNING, "Downloading '" + fileReference.value() + "' timed out");
+ return Optional.empty();
+ }
+ return fileDownloaded;
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException("Could not download '" + fileReference.value() + "'");
+ }
}
ImmutableSet<FileReference> queuedForDownload() {
- return ImmutableSet.copyOf(queuedForDownload);
+ return ImmutableSet.copyOf(fileReferenceDownloader.queuedForDownload().keySet());
}
-} \ No newline at end of file
+}
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java
new file mode 100644
index 00000000000..07b2094ef35
--- /dev/null
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java
@@ -0,0 +1,151 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.config.proxy.filedistribution;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
+import com.yahoo.config.FileReference;
+import com.yahoo.jrt.Request;
+import com.yahoo.jrt.StringValue;
+import com.yahoo.log.LogLevel;
+import com.yahoo.vespa.config.Connection;
+import com.yahoo.vespa.config.ConnectionPool;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Logger;
+
+/**
+ * Downloads file reference using rpc requests to config server and keeps track of files being downloaded
+ * <p>
+ * Some methods are synchronized to make sure access to queuedForDownload is atomic
+ *
+ * @author hmusum
+ */
+// TODO: Add retries when a config server does not have a file reference
+class FileReferenceDownloader {
+
+ private final static Logger log = Logger.getLogger(FileReferenceDownloader.class.getName());
+ private final static Duration rpcTimeout = Duration.ofSeconds(10);
+
+ private final File downloadDirectory;
+ private final ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
+ private final ConnectionPool connectionPool;
+ private final Map<FileReference, SettableFuture<Optional<File>>> queuedForDownload = new LinkedHashMap<>();
+ private final Map<FileReference, Double> downloadStatus = new HashMap<>();
+
+ FileReferenceDownloader(File downloadDirectory, ConnectionPool connectionPool) {
+ this.downloadDirectory = downloadDirectory;
+ this.connectionPool = connectionPool;
+ }
+
+ synchronized Optional<File> startDownload(FileReference fileReference,
+ Duration timeout,
+ SettableFuture<Optional<File>> file)
+ throws ExecutionException, InterruptedException, TimeoutException {
+ queuedForDownload.put(fileReference, file);
+ setDownloadStatus(fileReference.value(), 0.0);
+ if (startDownloadRpc(fileReference))
+ return file.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
+ else {
+ file.setException(new RuntimeException("Failed getting file"));
+ queuedForDownload.remove(fileReference);
+ return Optional.empty();
+ }
+ }
+
+ void receiveFile(FileReference fileReference, String filename, byte[] content) {
+ File fileReferenceDir = new File(downloadDirectory, fileReference.value());
+ try {
+ Files.createDirectories(fileReferenceDir.toPath());
+ File file = new File(fileReferenceDir, filename);
+ log.log(LogLevel.INFO, "Writing data to " + file.getAbsolutePath());
+ Files.write(file.toPath(), content);
+ completedDownloading(fileReference, file);
+ } catch (IOException e) {
+ log.log(LogLevel.ERROR, "Failed writing file: " + e.getMessage());
+ throw new RuntimeException("Failed writing file: ", e);
+ }
+ }
+
+ Map<FileReference, SettableFuture<Optional<File>>> queuedForDownload() {
+ return queuedForDownload;
+ }
+
+ private synchronized void completedDownloading(FileReference fileReference, File file) {
+ queuedForDownload.get(fileReference).set(Optional.of(file));
+ downloadStatus.put(fileReference, 100.0);
+ }
+
+ private boolean startDownloadRpc(FileReference fileReference) throws ExecutionException, InterruptedException {
+ Connection connection = connectionPool.getCurrent();
+ Request request = new Request("filedistribution.serveFile");
+ request.parameters().add(new StringValue(fileReference.value()));
+
+ execute(request, connection);
+ if (validateResponse(request)) {
+ log.log(LogLevel.DEBUG, "Request callback, OK. Req: " + request + "\nSpec: " + connection);
+ if (request.returnValues().get(0).asInt32() == 0)
+ log.log(LogLevel.INFO, "Found file reference '" + fileReference.value() + "' available at " + connection.getAddress());
+ else
+ log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' not found for " + connection.getAddress());
+ return true;
+ } else {
+ log.log(LogLevel.WARNING, "Request failed. Req: " + request + "\nSpec: " + connection.getAddress());
+ connection.setError(request.errorCode());
+ // TODO: Retry with another config server
+ return false;
+ }
+ }
+
+ synchronized boolean isDownloading(FileReference fileReference) {
+ return queuedForDownload.containsKey(fileReference);
+ }
+
+ synchronized ListenableFuture<Optional<File>> addDownloadListener(FileReference fileReference, Runnable runnable) {
+ SettableFuture<Optional<File>> future = queuedForDownload.get(fileReference);
+ future.addListener(runnable, service);
+ return future;
+ }
+
+ private void execute(Request request, Connection connection) {
+ connection.invokeSync(request, (double) rpcTimeout.getSeconds());
+ }
+
+ private boolean validateResponse(Request request) {
+ if (request.isError()) {
+ return false;
+ } else if (request.returnValues().size() == 0) {
+ return false;
+ } else if (!request.checkReturnTypes("i")) {
+ log.log(LogLevel.WARNING, "Invalid return types for response: " + request.errorMessage());
+ return false;
+ }
+ return true;
+ }
+
+ double downloadStatus(String file) {
+ return downloadStatus.getOrDefault(new FileReference(file), 0.0);
+ }
+
+ void setDownloadStatus(String file, double percentageDownloaded) {
+ downloadStatus.put(new FileReference(file), percentageDownloaded);
+ }
+
+ Map<FileReference, Double> downloadStatus() {
+ return ImmutableMap.copyOf(downloadStatus);
+ }
+}
diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServerTest.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServerTest.java
index c1e9826e29f..f9b334a6f87 100644
--- a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServerTest.java
+++ b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServerTest.java
@@ -38,8 +38,7 @@ public class ConfigProxyRpcServerTest {
@Test
public void basic() {
- ConfigSourceSet configSources = new ConfigSourceSet();
- ProxyServer proxy = ProxyServer.createTestServer(configSources);
+ ProxyServer proxy = ProxyServer.createTestServer(new MockConfigSource(new MockClientUpdater()));
Spec spec = new Spec("localhost", 12345);
ConfigProxyRpcServer server = new ConfigProxyRpcServer(proxy, spec);
assertThat(server.getSpec(), is(spec));
diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockConfigSource.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockConfigSource.java
index dc9a3408510..2b26996fbdc 100644
--- a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockConfigSource.java
+++ b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockConfigSource.java
@@ -1,12 +1,14 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.config.proxy;
-import com.yahoo.config.subscription.ConfigSource;
import com.yahoo.config.subscription.ConfigSourceSet;
import com.yahoo.vespa.config.ConfigKey;
import com.yahoo.vespa.config.RawConfig;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.Set;
/**
* A simple class to be able to test config proxy without having an RPC config
@@ -37,4 +39,9 @@ class MockConfigSource extends ConfigSourceSet {
backing.clear();
}
+ @Override
+ public Set<String> getSources() {
+ return Collections.singleton("tcp/localhost:19070,tcp/localhost:19071,tcp/localhost:19072");
+ }
+
}
diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ProxyServerTest.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ProxyServerTest.java
index f82d9e90184..3cd0f1043cc 100644
--- a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ProxyServerTest.java
+++ b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ProxyServerTest.java
@@ -95,7 +95,6 @@ public class ProxyServerTest {
*/
@Test
public void testModeSwitch() {
- ConfigSourceSet source = new ConfigSourceSet(); // Need to use a ConfigSourceSet to test modes
ProxyServer proxy = ProxyServer.createTestServer(source);
assertTrue(proxy.getMode().isDefault());
diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java
index ea880e451b6..64ae1a07aea 100644
--- a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java
+++ b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java
@@ -1,79 +1,226 @@
package com.yahoo.vespa.config.proxy.filedistribution;
import com.yahoo.config.FileReference;
-import com.yahoo.config.subscription.ConfigSourceSet;
import com.yahoo.io.IOUtils;
+import com.yahoo.jrt.Int32Value;
+import com.yahoo.jrt.Request;
+import com.yahoo.jrt.RequestWaiter;
+import com.yahoo.text.Utf8;
+import com.yahoo.vespa.config.Connection;
+import com.yahoo.vespa.config.ConnectionPool;
+import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.time.Duration;
-import java.util.*;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
public class FileDownloaderTest {
- private static final ConfigSourceSet configSourceSet = new ConfigSourceSet();
+
+ private MockConnection connection;
+ private FileDownloader fileDownloader;
+
+ @Before
+ public void setup() {
+ try {
+ File downloadDir = Files.createTempDirectory("filedistribution").toFile();
+ connection = new MockConnection();
+ fileDownloader = new FileDownloader(connection, downloadDir, Duration.ofMillis(3000));
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
@Test
- public void download() throws IOException {
- File downloadDir = Files.createTempDirectory("filedistribution").toFile();
- FileDownloader fileDownloader = new FileDownloader(configSourceSet, downloadDir.getAbsolutePath(), Duration.ofMillis(200));
+ public void getFile() throws IOException {
+ File downloadDir = fileDownloader.downloadDirectory();
- // Write a file to download directory to simulate download going OK
- String fileReferenceString = "somehash";
- String fileName = "foo.jar";
- File fileReferenceFullPath = fileReferenceFullPath(downloadDir, fileReferenceString);
- FileReference fileReference = writeFileReference(downloadDir, fileReferenceString, fileName);
+ {
+ // fileReference already exists on disk, does not have to be downloaded
- // Check that we get correct path and content when asking for file reference
- Optional<File> pathToFile = fileDownloader.getFile(fileReference);
- assertTrue(pathToFile.isPresent());
- String downloadedFile = new File(fileReferenceFullPath, fileName).getAbsolutePath();
- assertEquals(new File(fileReferenceFullPath, fileName).getAbsolutePath(), downloadedFile);
- assertEquals("content", IOUtils.readFile(pathToFile.get()));
+ String fileReferenceString = "foo";
+ String filename = "foo.jar";
+ File fileReferenceFullPath = fileReferenceFullPath(downloadDir, fileReferenceString);
+ FileReference fileReference = new FileReference(fileReferenceString);
+ writeFileReference(downloadDir, fileReferenceString, filename);
- // Verify download status
- Map<FileReference, Double> downloadStatus = fileDownloader.downloadStatus();
- assertEquals(1, downloadStatus.size());
- assertDownloadStatus(Collections.singletonList(fileReference), downloadStatus.entrySet().iterator().next(), 100.0);
+ // Check that we get correct path and content when asking for file reference
+ Optional<File> pathToFile = fileDownloader.getFile(fileReference);
+ assertTrue(pathToFile.isPresent());
+ String downloadedFile = new File(fileReferenceFullPath, filename).getAbsolutePath();
+ assertEquals(new File(fileReferenceFullPath, filename).getAbsolutePath(), downloadedFile);
+ assertEquals("content", IOUtils.readFile(pathToFile.get()));
+
+ // Verify download status when downloaded
+ assertDownloadStatus(fileDownloader, fileReference, 100.0);
+ }
+
+ {
+ // fileReference does not exist on disk, needs to be downloaded, but fails when asking upstream for file)
+
+ connection.setResponseHandler(new MockConnection.UnknownFileReferenceResponseHandler());
+
+ FileReference fileReference = new FileReference("bar");
+ File fileReferenceFullPath = fileReferenceFullPath(downloadDir, fileReference.value());
+ assertFalse(fileReferenceFullPath.getAbsolutePath(), fileDownloader.getFile(fileReference).isPresent());
+
+ // Verify download status when unable to download
+ assertDownloadStatus(fileDownloader, fileReference, 0.0);
+ }
+
+ {
+ // fileReference does not exist on disk, needs to be downloaded)
- // Non-existing file
- assertFalse(fileReferenceFullPath.getAbsolutePath(), fileDownloader.getFile(new FileReference("doesnotexist")).isPresent());
+ FileReference fileReference = new FileReference("fileReference");
+ File fileReferenceFullPath = fileReferenceFullPath(downloadDir, fileReference.value());
+ assertFalse(fileReferenceFullPath.getAbsolutePath(), fileDownloader.getFile(fileReference).isPresent());
+
+ // Verify download status
+ assertDownloadStatus(fileDownloader, fileReference, 0.0);
+
+ // Receives fileReference, should return and make it available to caller
+ String filename = "abc.jar";
+ fileDownloader.receiveFile(fileReference, filename, Utf8.toBytes("some other content"));
+ Optional<File> downloadedFile = fileDownloader.getFile(fileReference);
+
+ assertTrue(downloadedFile.isPresent());
+ File downloadedFileFullPath = new File(fileReferenceFullPath, filename);
+ assertEquals(downloadedFileFullPath.getAbsolutePath(), downloadedFile.get().getAbsolutePath());
+ assertEquals("some other content", IOUtils.readFile(downloadedFile.get()));
+
+ // Verify download status when downloaded
+ assertDownloadStatus(fileDownloader, fileReference, 100.0);
+ }
}
@Test
public void setFilesToDownload() throws IOException {
File downloadDir = Files.createTempDirectory("filedistribution").toFile();
- FileDownloader fileDownloader = new FileDownloader(configSourceSet, downloadDir.getAbsolutePath(), Duration.ofMillis(200));
- List<FileReference> fileReferences = Arrays.asList(new FileReference("foo"), new FileReference("bar"));
- fileDownloader.queueForDownload(fileReferences);
+ MockConnection configSource = new MockConnection();
+ FileDownloader fileDownloader = new FileDownloader(configSource, downloadDir, Duration.ofMillis(200));
+ FileReference foo = new FileReference("foo");
+ FileReference bar = new FileReference("bar");
+ List<FileReference> fileReferences = Arrays.asList(foo, bar);
+ fileDownloader.download(fileReferences);
assertEquals(fileReferences, fileDownloader.queuedForDownload().asList());
// Verify download status
- Map<FileReference, Double> downloadStatus = fileDownloader.downloadStatus();
- assertEquals(2, downloadStatus.size());
-
- assertDownloadStatus(fileReferences, downloadStatus.entrySet().iterator().next(), 0.0);
- assertDownloadStatus(fileReferences, downloadStatus.entrySet().iterator().next(), 0.0);
+ assertDownloadStatus(fileDownloader, foo, 0.0);
+ assertDownloadStatus(fileDownloader, bar, 0.0);
}
- private FileReference writeFileReference(File dir, String fileReferenceString, String fileName) throws IOException {
+ private void writeFileReference(File dir, String fileReferenceString, String fileName) throws IOException {
File file = new File(new File(dir, fileReferenceString), fileName);
IOUtils.writeFile(file, "content", false);
- return new FileReference(fileReferenceString);
}
private File fileReferenceFullPath(File dir, String fileReferenceString) {
return new File(dir, fileReferenceString);
}
- private void assertDownloadStatus(List<FileReference> fileReferences, Map.Entry<FileReference, Double> entry, double expectedDownloadStatus) {
- assertTrue(fileReferences.contains(new FileReference(entry.getKey().value())));
- assertEquals(expectedDownloadStatus, entry.getValue(), 0.0001);
+ private void assertDownloadStatus(FileDownloader fileDownloader, FileReference fileReference, double expectedDownloadStatus) {
+ double downloadStatus = fileDownloader.downloadStatus(fileReference);
+ assertEquals(expectedDownloadStatus, downloadStatus, 0.0001);
+ }
+
+ private static class MockConnection implements ConnectionPool, com.yahoo.vespa.config.Connection {
+
+ private ResponseHandler responseHandler;
+
+ MockConnection() {
+ this(new FileReferenceFoundResponseHandler());
+ }
+
+ MockConnection(ResponseHandler responseHandler) {
+ this.responseHandler = responseHandler;
+ }
+
+ @Override
+ public void invokeAsync(Request request, double jrtTimeout, RequestWaiter requestWaiter) {
+ responseHandler.request(request);
+ }
+
+ @Override
+ public void invokeSync(Request request, double jrtTimeout) {
+ responseHandler.request(request);
+ }
+
+ @Override
+ public void setError(int errorCode) {
+ }
+
+ @Override
+ public void setSuccess() {
+ }
+
+ @Override
+ public String getAddress() {
+ return null;
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public void setError(Connection connection, int errorCode) {
+ connection.setError(errorCode);
+ }
+
+ @Override
+ public Connection getCurrent() {
+ return this;
+ }
+
+ @Override
+ public Connection setNewCurrentConnection() {
+ return this;
+ }
+
+ @Override
+ public int getSize() {
+ return 1;
+ }
+
+ public void setResponseHandler(ResponseHandler responseHandler) {
+ this.responseHandler = responseHandler;
+ }
+
+ static class FileReferenceFoundResponseHandler implements ResponseHandler {
+
+ @Override
+ public void request(Request request) {
+ if (request.methodName().equals("filedistribution.serveFile"))
+ request.returnValues().add(new Int32Value(0));
+ }
+ }
+
+ static class UnknownFileReferenceResponseHandler implements ResponseHandler {
+
+ @Override
+ public void request(Request request) {
+ if (request.methodName().equals("filedistribution.serveFile"))
+ request.returnValues().add(new Int32Value(1));
+ }
+ }
+
+ public interface ResponseHandler {
+
+ void request(Request request);
+
+ }
+
}
+
}