diff options
author | Harald Musum <musum@oath.com> | 2017-11-07 10:53:58 +0100 |
---|---|---|
committer | Harald Musum <musum@oath.com> | 2017-11-07 10:53:58 +0100 |
commit | 3844115235cf56c977aa580a3cd56f81ebfcc520 (patch) | |
tree | 11607e9a45284e34d9fae3536b5d3c02ef418330 | |
parent | 5e9a9227722eec66390b2670359a41b4703b343a (diff) |
Implement downloading of file references, not functional yet
12 files changed, 518 insertions, 120 deletions
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java index 1fced0b1e3d..e877d7f1d0c 100644 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java @@ -11,8 +11,12 @@ import com.yahoo.vespa.config.protocol.JRTServerConfigRequest; import com.yahoo.vespa.config.protocol.JRTServerConfigRequestV3; import java.io.File; -import java.lang.*; -import java.util.*; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; import java.util.logging.Logger; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -109,12 +113,12 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer // Legacy method, needs to be the same name as used in filedistributor supervisor.addMethod(new Method("waitFor", "s", "s", this, "getFile") - .methodDesc("wait for file reference") + .methodDesc("get path to file reference") .paramDesc(0, "file reference", "file reference") .returnDesc(0, "path", "path to file")); supervisor.addMethod(new Method("filedistribution.getFile", "s", "s", this, "getFile") - .methodDesc("wait for file reference") + .methodDesc("get path to file reference") .paramDesc(0, "file reference", "file reference") .returnDesc(0, "path", "path to file")); supervisor.addMethod(new Method("filedistribution.getActiveFileReferencesStatus", "", "SD", @@ -127,6 +131,13 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer .methodDesc("set which file references to download") .paramDesc(0, "file references", "file reference to download") .returnDesc(0, "ret", "0 if success, 1 otherwise")); + supervisor.addMethod(new Method("filedistribution.receiveFile", "slx", "i", // TODO Temporary method to get started with testing + this, "receiveFile") + .methodDesc("receive file reference content") + .paramDesc(0, "file references", "file reference to download") + .paramDesc(1, "filename", "filename") + .paramDesc(2, "content", "array of bytes") + .returnDesc(0, "ret", "0 if success, 1 otherwise")); } //---------------- RPC methods ------------------------------------ @@ -235,17 +246,33 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer req.returnValues().add(new StringValue(memoryCache.dumpCacheToDisk(req.parameters().get(0).asString(), memoryCache))); } + // TODO: Duplicate of code in FileAcquirereImpl. Find out where to put it. What about C++ code using this RPC call? + private static final int baseErrorCode = 0x10000; + private static final int baseFileProviderErrorCode = baseErrorCode + 0x1000; + + private static final int fileReferenceDoesNotExists = baseFileProviderErrorCode; + private static final int fileReferenceRemoved = fileReferenceDoesNotExists + 1; + private static final int fileReferenceInternalError = fileReferenceRemoved + 1; + @SuppressWarnings({"UnusedDeclaration"}) public final void getFile(Request req) { - // TODO: Detach to avoid holding transport thread + req.detach(); FileReference fileReference = new FileReference(req.parameters().get(0).asString()); - String pathToFile = proxyServer.fileDownloader() - .getFile(fileReference) - .orElseGet(() -> new File("")) - .getAbsolutePath(); - - log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' available at " + pathToFile); - req.returnValues().add(new StringValue(pathToFile)); + log.log(LogLevel.DEBUG, "getFile() called for file reference '" + fileReference.value() + "'"); + Optional<File> pathToFile = proxyServer.fileDownloader().getFile(fileReference); + try { + if (pathToFile.isPresent()) { + req.returnValues().add(new StringValue(pathToFile.get().getAbsolutePath())); + log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' available at " + pathToFile.get()); + } else { + log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' not found, returning error"); + req.setError(fileReferenceDoesNotExists, "File reference '" + fileReference.value() + "' not found"); + } + } catch (Throwable e) { + log.log(LogLevel.WARNING, "File reference '" + fileReference.value() + "' got exeption: " + e.getMessage()); + req.setError(fileReferenceInternalError, "File reference '" + fileReference.value() + "' removed"); + } + req.returnRequest(); } @SuppressWarnings({"UnusedDeclaration"}) @@ -274,11 +301,20 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer List<FileReference> fileReferences = Stream.of(fileReferenceStrings) .map(FileReference::new) .collect(Collectors.toList()); - proxyServer.fileDownloader().queueForDownload(fileReferences); + proxyServer.fileDownloader().download(fileReferences); req.returnValues().add(new Int32Value(0)); } + @SuppressWarnings({"UnusedDeclaration"}) + public final void receiveFile(Request req) { + FileReference fileReference = new FileReference(req.parameters().get(0).asString()); + String filename = req.parameters().get(1).asString(); + byte[] content = req.parameters().get(2).asData(); + proxyServer.fileDownloader().receiveFile(fileReference, filename, content); + req.returnValues().add(new Int32Value(0)); + } + //---------------------------------------------------- private boolean isProtocolVersionSupported(JRTServerConfigRequest request) { diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java index 4ee77beb2d7..5668852311f 100644 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java @@ -83,7 +83,7 @@ public class ProxyServer implements Runnable { this.rpcServer = createRpcServer(spec); clientUpdater = new ClientUpdater(rpcServer, statistics, delayedResponses); this.configClient = createClient(clientUpdater, delayedResponses, source, timingValues, memoryCache, configClient); - this.fileDownloader = new FileDownloader(source); + this.fileDownloader = new FileDownloader(new JRTConnectionPool(source)); } static ProxyServer createTestServer(ConfigSourceSet source) { diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloader.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloader.java index 9074527e4e4..21e9b7f160b 100644 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloader.java +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloader.java @@ -2,97 +2,147 @@ package com.yahoo.vespa.config.proxy.filedistribution; import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import com.yahoo.config.FileReference; -import com.yahoo.config.subscription.ConfigSourceSet; import com.yahoo.log.LogLevel; +import com.yahoo.vespa.config.ConnectionPool; import com.yahoo.vespa.defaults.Defaults; +import com.yahoo.yolean.Exceptions; import java.io.File; import java.time.Duration; -import java.time.Instant; -import java.util.*; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.logging.Logger; /** - * Keeps track of files to download and download status + * Handles downloads of files (file references only for now) * * @author hmusum */ public class FileDownloader { - private final static Logger log = Logger.getLogger(FileDownloader.class.getName()); + private final static Logger log = Logger.getLogger(FileDownloader.class.getName()); - private final String filesDirectory; - private final ConfigSourceSet configSourceSet; + private final File downloadDirectory; private final Duration timeout; - private final Map<FileReference, Double> downloadStatus = new HashMap<>(); - private final Set<FileReference> queuedForDownload = new LinkedHashSet<>(); - public FileDownloader(ConfigSourceSet configSourceSet) { - this(configSourceSet, - Defaults.getDefaults().underVespaHome("var/db/vespa/filedistribution"), + private final FileReferenceDownloader fileReferenceDownloader; + private final ExecutorService service = Executors.newFixedThreadPool(10); + + public FileDownloader(ConnectionPool connectionPool) { + this(connectionPool, + new File(Defaults.getDefaults().underVespaHome("var/db/vespa/filedistribution")), Duration.ofMinutes(15)); } - FileDownloader(ConfigSourceSet configSourceSet, String filesDirectory, Duration timeout) { - this.configSourceSet = configSourceSet; - this.filesDirectory = filesDirectory; + FileDownloader(ConnectionPool connectionPool, File downloadDirectory, Duration timeout) { + this.downloadDirectory = downloadDirectory; this.timeout = timeout; + this.fileReferenceDownloader = new FileReferenceDownloader(downloadDirectory, connectionPool); } public Optional<File> getFile(FileReference fileReference) { Objects.requireNonNull(fileReference, "file reference cannot be null"); - File directory = new File(filesDirectory, fileReference.value()); // directory with one file - + File directory = new File(downloadDirectory, fileReference.value()); log.log(LogLevel.DEBUG, "Checking if there is a file in '" + directory.getAbsolutePath() + "' "); - Instant end = Instant.now().plus(timeout); - do { - File[] files = directory.listFiles(); - if (directory.exists() && directory.isDirectory() && files != null && files.length > 0) { - if (files.length != 1) { - throw new RuntimeException("More than one file in '" + fileReference.value() + - "', expected only one, unable to proceed"); - } - File file = files[0]; - if (!file.exists()) { - throw new RuntimeException("File with reference '" + fileReference.value() + - "' does not exist"); - } else if (!file.canRead()) { - throw new RuntimeException("File with reference '" + fileReference.value() + - "'exists, but unable to read it"); - } else { - downloadStatus.put(fileReference, 100.0); - return Optional.of(file); - } - } else { - queueForDownload(fileReference); - } - try { - Thread.sleep(100); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } while (Instant.now().isBefore(end)); - return Optional.empty(); + Optional<File> file = getFileFromFileSystem(fileReference, directory); + if (file.isPresent()) { + return file; + } else { + log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' not found in " + + directory.getAbsolutePath() + ", starting download"); + return download(fileReference, timeout); + } + } + + public void download(List<FileReference> fileReferences) { + fileReferences.forEach(fileReference -> download(fileReference, timeout)); + } + + public void receiveFile(FileReference fileReference, String filename, byte[] content) { + fileReferenceDownloader.receiveFile(fileReference, filename, content); + } + + double downloadStatus(FileReference fileReference) { + return fileReferenceDownloader.downloadStatus(fileReference.value()); } public Map<FileReference, Double> downloadStatus() { - return downloadStatus; + return fileReferenceDownloader.downloadStatus(); + } + + File downloadDirectory() { + return downloadDirectory; } - public void queueForDownload(List<FileReference> fileReferences) { - fileReferences.forEach(this::queueForDownload); + private Optional<File> getFileFromFileSystem(FileReference fileReference, File directory) { + File[] files = directory.listFiles(); + if (directory.exists() && directory.isDirectory() && files != null && files.length > 0) { + if (files.length != 1) { + throw new RuntimeException("More than one file in '" + fileReference.value() + + "', expected only one, unable to proceed"); + } + File file = files[0]; + if (!file.exists()) { + throw new RuntimeException("File with reference '" + fileReference.value() + + "' does not exist"); + } else if (!file.canRead()) { + throw new RuntimeException("File with reference '" + fileReference.value() + + "'exists, but unable to read it"); + } else { + fileReferenceDownloader.setDownloadStatus(fileReference.value(), 100.0); + return Optional.of(file); + } + } + return Optional.empty(); } - private void queueForDownload(FileReference fileReference) { - log.log(LogLevel.INFO, "Queued '" + fileReference.value() + "' for download "); - queuedForDownload.add(fileReference); - downloadStatus.put(fileReference, 0.0); + private synchronized Optional<File> download(FileReference fileReference, Duration timeout) { + if (fileReferenceDownloader.isDownloading(fileReference)) { + log.log(LogLevel.INFO, "Already downloading '" + fileReference.value() + "'"); + ListenableFuture<Optional<File>> future = + fileReferenceDownloader.addDownloadListener(fileReference, () -> getFile(fileReference)); + try { + return future.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException("Failed downloading file reference '" + fileReference.value() + "': " + + Exceptions.toMessageString(e)); + } + } + + SettableFuture<Optional<File>> file = SettableFuture.create(); + service.submit(() -> fileReferenceDownloader.startDownload(fileReference, timeout, file)); + log.log(LogLevel.INFO, "Started download of '" + fileReference.value() + "' with timeout " + timeout); + + try { + Optional<File> fileDownloaded; + try { + log.log(LogLevel.INFO, "Waiting for '" + fileReference.value() + "' to download"); + fileDownloaded = file.get(timeout.getSeconds() - 1, TimeUnit.SECONDS); + log.log(LogLevel.INFO, "'" + fileReference.value() + "' downloaded"); + } catch (TimeoutException e) { + log.log(LogLevel.WARNING, "Downloading '" + fileReference.value() + "' timed out"); + return Optional.empty(); + } + return fileDownloaded; + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException("Could not download '" + fileReference.value() + "'"); + } } ImmutableSet<FileReference> queuedForDownload() { - return ImmutableSet.copyOf(queuedForDownload); + return ImmutableSet.copyOf(fileReferenceDownloader.queuedForDownload().keySet()); } -}
\ No newline at end of file +} diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java new file mode 100644 index 00000000000..07b2094ef35 --- /dev/null +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java @@ -0,0 +1,151 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.config.proxy.filedistribution; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; +import com.yahoo.config.FileReference; +import com.yahoo.jrt.Request; +import com.yahoo.jrt.StringValue; +import com.yahoo.log.LogLevel; +import com.yahoo.vespa.config.Connection; +import com.yahoo.vespa.config.ConnectionPool; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.time.Duration; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.logging.Logger; + +/** + * Downloads file reference using rpc requests to config server and keeps track of files being downloaded + * <p> + * Some methods are synchronized to make sure access to queuedForDownload is atomic + * + * @author hmusum + */ +// TODO: Add retries when a config server does not have a file reference +class FileReferenceDownloader { + + private final static Logger log = Logger.getLogger(FileReferenceDownloader.class.getName()); + private final static Duration rpcTimeout = Duration.ofSeconds(10); + + private final File downloadDirectory; + private final ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10)); + private final ConnectionPool connectionPool; + private final Map<FileReference, SettableFuture<Optional<File>>> queuedForDownload = new LinkedHashMap<>(); + private final Map<FileReference, Double> downloadStatus = new HashMap<>(); + + FileReferenceDownloader(File downloadDirectory, ConnectionPool connectionPool) { + this.downloadDirectory = downloadDirectory; + this.connectionPool = connectionPool; + } + + synchronized Optional<File> startDownload(FileReference fileReference, + Duration timeout, + SettableFuture<Optional<File>> file) + throws ExecutionException, InterruptedException, TimeoutException { + queuedForDownload.put(fileReference, file); + setDownloadStatus(fileReference.value(), 0.0); + if (startDownloadRpc(fileReference)) + return file.get(timeout.toMillis(), TimeUnit.MILLISECONDS); + else { + file.setException(new RuntimeException("Failed getting file")); + queuedForDownload.remove(fileReference); + return Optional.empty(); + } + } + + void receiveFile(FileReference fileReference, String filename, byte[] content) { + File fileReferenceDir = new File(downloadDirectory, fileReference.value()); + try { + Files.createDirectories(fileReferenceDir.toPath()); + File file = new File(fileReferenceDir, filename); + log.log(LogLevel.INFO, "Writing data to " + file.getAbsolutePath()); + Files.write(file.toPath(), content); + completedDownloading(fileReference, file); + } catch (IOException e) { + log.log(LogLevel.ERROR, "Failed writing file: " + e.getMessage()); + throw new RuntimeException("Failed writing file: ", e); + } + } + + Map<FileReference, SettableFuture<Optional<File>>> queuedForDownload() { + return queuedForDownload; + } + + private synchronized void completedDownloading(FileReference fileReference, File file) { + queuedForDownload.get(fileReference).set(Optional.of(file)); + downloadStatus.put(fileReference, 100.0); + } + + private boolean startDownloadRpc(FileReference fileReference) throws ExecutionException, InterruptedException { + Connection connection = connectionPool.getCurrent(); + Request request = new Request("filedistribution.serveFile"); + request.parameters().add(new StringValue(fileReference.value())); + + execute(request, connection); + if (validateResponse(request)) { + log.log(LogLevel.DEBUG, "Request callback, OK. Req: " + request + "\nSpec: " + connection); + if (request.returnValues().get(0).asInt32() == 0) + log.log(LogLevel.INFO, "Found file reference '" + fileReference.value() + "' available at " + connection.getAddress()); + else + log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' not found for " + connection.getAddress()); + return true; + } else { + log.log(LogLevel.WARNING, "Request failed. Req: " + request + "\nSpec: " + connection.getAddress()); + connection.setError(request.errorCode()); + // TODO: Retry with another config server + return false; + } + } + + synchronized boolean isDownloading(FileReference fileReference) { + return queuedForDownload.containsKey(fileReference); + } + + synchronized ListenableFuture<Optional<File>> addDownloadListener(FileReference fileReference, Runnable runnable) { + SettableFuture<Optional<File>> future = queuedForDownload.get(fileReference); + future.addListener(runnable, service); + return future; + } + + private void execute(Request request, Connection connection) { + connection.invokeSync(request, (double) rpcTimeout.getSeconds()); + } + + private boolean validateResponse(Request request) { + if (request.isError()) { + return false; + } else if (request.returnValues().size() == 0) { + return false; + } else if (!request.checkReturnTypes("i")) { + log.log(LogLevel.WARNING, "Invalid return types for response: " + request.errorMessage()); + return false; + } + return true; + } + + double downloadStatus(String file) { + return downloadStatus.getOrDefault(new FileReference(file), 0.0); + } + + void setDownloadStatus(String file, double percentageDownloaded) { + downloadStatus.put(new FileReference(file), percentageDownloaded); + } + + Map<FileReference, Double> downloadStatus() { + return ImmutableMap.copyOf(downloadStatus); + } +} diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServerTest.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServerTest.java index c1e9826e29f..f9b334a6f87 100644 --- a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServerTest.java +++ b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServerTest.java @@ -38,8 +38,7 @@ public class ConfigProxyRpcServerTest { @Test public void basic() { - ConfigSourceSet configSources = new ConfigSourceSet(); - ProxyServer proxy = ProxyServer.createTestServer(configSources); + ProxyServer proxy = ProxyServer.createTestServer(new MockConfigSource(new MockClientUpdater())); Spec spec = new Spec("localhost", 12345); ConfigProxyRpcServer server = new ConfigProxyRpcServer(proxy, spec); assertThat(server.getSpec(), is(spec)); diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockConfigSource.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockConfigSource.java index dc9a3408510..2b26996fbdc 100644 --- a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockConfigSource.java +++ b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockConfigSource.java @@ -1,12 +1,14 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.config.proxy; -import com.yahoo.config.subscription.ConfigSource; import com.yahoo.config.subscription.ConfigSourceSet; import com.yahoo.vespa.config.ConfigKey; import com.yahoo.vespa.config.RawConfig; +import java.util.Collection; +import java.util.Collections; import java.util.HashMap; +import java.util.Set; /** * A simple class to be able to test config proxy without having an RPC config @@ -37,4 +39,9 @@ class MockConfigSource extends ConfigSourceSet { backing.clear(); } + @Override + public Set<String> getSources() { + return Collections.singleton("tcp/localhost:19070,tcp/localhost:19071,tcp/localhost:19072"); + } + } diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ProxyServerTest.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ProxyServerTest.java index f82d9e90184..3cd0f1043cc 100644 --- a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ProxyServerTest.java +++ b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ProxyServerTest.java @@ -95,7 +95,6 @@ public class ProxyServerTest { */ @Test public void testModeSwitch() { - ConfigSourceSet source = new ConfigSourceSet(); // Need to use a ConfigSourceSet to test modes ProxyServer proxy = ProxyServer.createTestServer(source); assertTrue(proxy.getMode().isDefault()); diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java index ea880e451b6..64ae1a07aea 100644 --- a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java +++ b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java @@ -1,79 +1,226 @@ package com.yahoo.vespa.config.proxy.filedistribution; import com.yahoo.config.FileReference; -import com.yahoo.config.subscription.ConfigSourceSet; import com.yahoo.io.IOUtils; +import com.yahoo.jrt.Int32Value; +import com.yahoo.jrt.Request; +import com.yahoo.jrt.RequestWaiter; +import com.yahoo.text.Utf8; +import com.yahoo.vespa.config.Connection; +import com.yahoo.vespa.config.ConnectionPool; +import org.junit.Before; import org.junit.Test; import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.time.Duration; -import java.util.*; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class FileDownloaderTest { - private static final ConfigSourceSet configSourceSet = new ConfigSourceSet(); + + private MockConnection connection; + private FileDownloader fileDownloader; + + @Before + public void setup() { + try { + File downloadDir = Files.createTempDirectory("filedistribution").toFile(); + connection = new MockConnection(); + fileDownloader = new FileDownloader(connection, downloadDir, Duration.ofMillis(3000)); + } catch (IOException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } @Test - public void download() throws IOException { - File downloadDir = Files.createTempDirectory("filedistribution").toFile(); - FileDownloader fileDownloader = new FileDownloader(configSourceSet, downloadDir.getAbsolutePath(), Duration.ofMillis(200)); + public void getFile() throws IOException { + File downloadDir = fileDownloader.downloadDirectory(); - // Write a file to download directory to simulate download going OK - String fileReferenceString = "somehash"; - String fileName = "foo.jar"; - File fileReferenceFullPath = fileReferenceFullPath(downloadDir, fileReferenceString); - FileReference fileReference = writeFileReference(downloadDir, fileReferenceString, fileName); + { + // fileReference already exists on disk, does not have to be downloaded - // Check that we get correct path and content when asking for file reference - Optional<File> pathToFile = fileDownloader.getFile(fileReference); - assertTrue(pathToFile.isPresent()); - String downloadedFile = new File(fileReferenceFullPath, fileName).getAbsolutePath(); - assertEquals(new File(fileReferenceFullPath, fileName).getAbsolutePath(), downloadedFile); - assertEquals("content", IOUtils.readFile(pathToFile.get())); + String fileReferenceString = "foo"; + String filename = "foo.jar"; + File fileReferenceFullPath = fileReferenceFullPath(downloadDir, fileReferenceString); + FileReference fileReference = new FileReference(fileReferenceString); + writeFileReference(downloadDir, fileReferenceString, filename); - // Verify download status - Map<FileReference, Double> downloadStatus = fileDownloader.downloadStatus(); - assertEquals(1, downloadStatus.size()); - assertDownloadStatus(Collections.singletonList(fileReference), downloadStatus.entrySet().iterator().next(), 100.0); + // Check that we get correct path and content when asking for file reference + Optional<File> pathToFile = fileDownloader.getFile(fileReference); + assertTrue(pathToFile.isPresent()); + String downloadedFile = new File(fileReferenceFullPath, filename).getAbsolutePath(); + assertEquals(new File(fileReferenceFullPath, filename).getAbsolutePath(), downloadedFile); + assertEquals("content", IOUtils.readFile(pathToFile.get())); + + // Verify download status when downloaded + assertDownloadStatus(fileDownloader, fileReference, 100.0); + } + + { + // fileReference does not exist on disk, needs to be downloaded, but fails when asking upstream for file) + + connection.setResponseHandler(new MockConnection.UnknownFileReferenceResponseHandler()); + + FileReference fileReference = new FileReference("bar"); + File fileReferenceFullPath = fileReferenceFullPath(downloadDir, fileReference.value()); + assertFalse(fileReferenceFullPath.getAbsolutePath(), fileDownloader.getFile(fileReference).isPresent()); + + // Verify download status when unable to download + assertDownloadStatus(fileDownloader, fileReference, 0.0); + } + + { + // fileReference does not exist on disk, needs to be downloaded) - // Non-existing file - assertFalse(fileReferenceFullPath.getAbsolutePath(), fileDownloader.getFile(new FileReference("doesnotexist")).isPresent()); + FileReference fileReference = new FileReference("fileReference"); + File fileReferenceFullPath = fileReferenceFullPath(downloadDir, fileReference.value()); + assertFalse(fileReferenceFullPath.getAbsolutePath(), fileDownloader.getFile(fileReference).isPresent()); + + // Verify download status + assertDownloadStatus(fileDownloader, fileReference, 0.0); + + // Receives fileReference, should return and make it available to caller + String filename = "abc.jar"; + fileDownloader.receiveFile(fileReference, filename, Utf8.toBytes("some other content")); + Optional<File> downloadedFile = fileDownloader.getFile(fileReference); + + assertTrue(downloadedFile.isPresent()); + File downloadedFileFullPath = new File(fileReferenceFullPath, filename); + assertEquals(downloadedFileFullPath.getAbsolutePath(), downloadedFile.get().getAbsolutePath()); + assertEquals("some other content", IOUtils.readFile(downloadedFile.get())); + + // Verify download status when downloaded + assertDownloadStatus(fileDownloader, fileReference, 100.0); + } } @Test public void setFilesToDownload() throws IOException { File downloadDir = Files.createTempDirectory("filedistribution").toFile(); - FileDownloader fileDownloader = new FileDownloader(configSourceSet, downloadDir.getAbsolutePath(), Duration.ofMillis(200)); - List<FileReference> fileReferences = Arrays.asList(new FileReference("foo"), new FileReference("bar")); - fileDownloader.queueForDownload(fileReferences); + MockConnection configSource = new MockConnection(); + FileDownloader fileDownloader = new FileDownloader(configSource, downloadDir, Duration.ofMillis(200)); + FileReference foo = new FileReference("foo"); + FileReference bar = new FileReference("bar"); + List<FileReference> fileReferences = Arrays.asList(foo, bar); + fileDownloader.download(fileReferences); assertEquals(fileReferences, fileDownloader.queuedForDownload().asList()); // Verify download status - Map<FileReference, Double> downloadStatus = fileDownloader.downloadStatus(); - assertEquals(2, downloadStatus.size()); - - assertDownloadStatus(fileReferences, downloadStatus.entrySet().iterator().next(), 0.0); - assertDownloadStatus(fileReferences, downloadStatus.entrySet().iterator().next(), 0.0); + assertDownloadStatus(fileDownloader, foo, 0.0); + assertDownloadStatus(fileDownloader, bar, 0.0); } - private FileReference writeFileReference(File dir, String fileReferenceString, String fileName) throws IOException { + private void writeFileReference(File dir, String fileReferenceString, String fileName) throws IOException { File file = new File(new File(dir, fileReferenceString), fileName); IOUtils.writeFile(file, "content", false); - return new FileReference(fileReferenceString); } private File fileReferenceFullPath(File dir, String fileReferenceString) { return new File(dir, fileReferenceString); } - private void assertDownloadStatus(List<FileReference> fileReferences, Map.Entry<FileReference, Double> entry, double expectedDownloadStatus) { - assertTrue(fileReferences.contains(new FileReference(entry.getKey().value()))); - assertEquals(expectedDownloadStatus, entry.getValue(), 0.0001); + private void assertDownloadStatus(FileDownloader fileDownloader, FileReference fileReference, double expectedDownloadStatus) { + double downloadStatus = fileDownloader.downloadStatus(fileReference); + assertEquals(expectedDownloadStatus, downloadStatus, 0.0001); + } + + private static class MockConnection implements ConnectionPool, com.yahoo.vespa.config.Connection { + + private ResponseHandler responseHandler; + + MockConnection() { + this(new FileReferenceFoundResponseHandler()); + } + + MockConnection(ResponseHandler responseHandler) { + this.responseHandler = responseHandler; + } + + @Override + public void invokeAsync(Request request, double jrtTimeout, RequestWaiter requestWaiter) { + responseHandler.request(request); + } + + @Override + public void invokeSync(Request request, double jrtTimeout) { + responseHandler.request(request); + } + + @Override + public void setError(int errorCode) { + } + + @Override + public void setSuccess() { + } + + @Override + public String getAddress() { + return null; + } + + @Override + public void close() { + } + + @Override + public void setError(Connection connection, int errorCode) { + connection.setError(errorCode); + } + + @Override + public Connection getCurrent() { + return this; + } + + @Override + public Connection setNewCurrentConnection() { + return this; + } + + @Override + public int getSize() { + return 1; + } + + public void setResponseHandler(ResponseHandler responseHandler) { + this.responseHandler = responseHandler; + } + + static class FileReferenceFoundResponseHandler implements ResponseHandler { + + @Override + public void request(Request request) { + if (request.methodName().equals("filedistribution.serveFile")) + request.returnValues().add(new Int32Value(0)); + } + } + + static class UnknownFileReferenceResponseHandler implements ResponseHandler { + + @Override + public void request(Request request) { + if (request.methodName().equals("filedistribution.serveFile")) + request.returnValues().add(new Int32Value(1)); + } + } + + public interface ResponseHandler { + + void request(Request request); + + } + } + } diff --git a/config/src/main/java/com/yahoo/config/subscription/impl/MockConnection.java b/config/src/main/java/com/yahoo/config/subscription/impl/MockConnection.java index 3509a960740..bd9a49c2fe2 100644 --- a/config/src/main/java/com/yahoo/config/subscription/impl/MockConnection.java +++ b/config/src/main/java/com/yahoo/config/subscription/impl/MockConnection.java @@ -53,6 +53,12 @@ public class MockConnection implements ConnectionPool, com.yahoo.vespa.config.Co } @Override + public void invokeSync(Request request, double jrtTimeout) { + numberOfRequests++; + lastRequest = request; + } + + @Override public void setError(int errorCode) { numberOfFailovers++; } @@ -68,9 +74,7 @@ public class MockConnection implements ConnectionPool, com.yahoo.vespa.config.Co } @Override - public void close() { - - } + public void close() {} @Override public void setError(Connection connection, int errorCode) { @@ -109,7 +113,6 @@ public class MockConnection implements ConnectionPool, com.yahoo.vespa.config.Co } } - public interface ResponseHandler extends Runnable { RequestWaiter requestWaiter(); diff --git a/config/src/main/java/com/yahoo/vespa/config/Connection.java b/config/src/main/java/com/yahoo/vespa/config/Connection.java index 3d487198450..e39175a3a78 100644 --- a/config/src/main/java/com/yahoo/vespa/config/Connection.java +++ b/config/src/main/java/com/yahoo/vespa/config/Connection.java @@ -11,6 +11,8 @@ public interface Connection { void invokeAsync(Request request, double jrtTimeout, RequestWaiter requestWaiter); + void invokeSync(Request request, double jrtTimeout); + void setError(int errorCode); void setSuccess(); diff --git a/config/src/main/java/com/yahoo/vespa/config/JRTConnection.java b/config/src/main/java/com/yahoo/vespa/config/JRTConnection.java index 96dd6f62244..01da823b87b 100644 --- a/config/src/main/java/com/yahoo/vespa/config/JRTConnection.java +++ b/config/src/main/java/com/yahoo/vespa/config/JRTConnection.java @@ -13,6 +13,7 @@ import java.util.logging.Logger; * @author <a href="mailto:gunnarga@yahoo-inc.com">Gunnar Gauslaa Bergem</a> */ public class JRTConnection implements Connection { + public final static Logger logger = Logger.getLogger(JRTConnection.class.getPackage().getName()); private final String address; private final Supervisor supervisor; @@ -30,17 +31,20 @@ public class JRTConnection implements Connection { yyyyMMddz.setTimeZone(TimeZone.getTimeZone("GMT")); } + + public JRTConnection(String address, Supervisor supervisor) { + this.address = address; + this.supervisor = supervisor; + } + @Override public void invokeAsync(Request request, double jrtTimeout, RequestWaiter requestWaiter) { getTarget().invokeAsync(request, jrtTimeout, requestWaiter); } - public final static Logger logger = Logger.getLogger(JRTConnection.class.getPackage().getName()); - - - public JRTConnection(String address, Supervisor supervisor) { - this.address = address; - this.supervisor = supervisor; + @Override + public void invokeSync(Request request, double jrtTimeout) { + getTarget().invokeSync(request, jrtTimeout); } public String getAddress() { diff --git a/config/src/main/java/com/yahoo/vespa/config/JRTConnectionPool.java b/config/src/main/java/com/yahoo/vespa/config/JRTConnectionPool.java index b27f83851b4..bb8f7e9f9ce 100644 --- a/config/src/main/java/com/yahoo/vespa/config/JRTConnectionPool.java +++ b/config/src/main/java/com/yahoo/vespa/config/JRTConnectionPool.java @@ -21,7 +21,7 @@ import java.util.logging.Logger; * The current connection is available with {@link #getCurrent()}. * When calling {@link #setError(Connection, int)}, {#link #setNewCurrentConnection} will always be called. * - * @author <a href="mailto:gunnarga@yahoo-inc.com">Gunnar Gauslaa Bergem</a> + * @author Gunnar Gauslaa Bergem * @author hmusum */ public class JRTConnectionPool implements ConnectionPool { |