diff options
author | Harald Musum <musum@yahoo-inc.com> | 2017-11-09 13:14:45 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-11-09 13:14:45 +0100 |
commit | 8d42bb22e208a754c4e17ac935edd457c81408c6 (patch) | |
tree | 7548a20caac4552becafbceeba5fb144c4a59017 /config-proxy/src | |
parent | 82c47fcb7f049faed6990e79bc149e567579c3ac (diff) |
Revert "Move file distribution stuff to filedistribution module"
Diffstat (limited to 'config-proxy/src')
7 files changed, 710 insertions, 12 deletions
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java index a599f820384..ae0360fecf2 100644 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.config.proxy; +import com.yahoo.config.FileReference; import com.yahoo.jrt.*; import com.yahoo.log.LogLevel; import com.yahoo.vespa.config.*; @@ -9,10 +10,16 @@ import com.yahoo.vespa.config.protocol.JRTConfigRequestFactory; import com.yahoo.vespa.config.protocol.JRTServerConfigRequest; import com.yahoo.vespa.config.protocol.JRTServerConfigRequestV3; +import java.io.File; import java.util.Arrays; import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.logging.Logger; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** @@ -20,20 +27,21 @@ import java.util.logging.Logger; * * @author hmusum */ +// TODO: Rename now that it also support file distribution request public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer { private final static Logger log = Logger.getLogger(ConfigProxyRpcServer.class.getName()); private static final int TRACELEVEL = 6; private final Spec spec; - private final Supervisor supervisor; + private final Supervisor supervisor = new Supervisor(new Transport()); private final ProxyServer proxyServer; - ConfigProxyRpcServer(ProxyServer proxyServer, Supervisor supervisor, Spec spec) { + ConfigProxyRpcServer(ProxyServer proxyServer, Spec spec) { this.proxyServer = proxyServer; - this.supervisor = supervisor; this.spec = spec; declareConfigMethods(); + declareFileDistributionMethods(); } public void run() { @@ -101,6 +109,40 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer .returnDesc(0, "ret", "Empty string or error message")); } + private void declareFileDistributionMethods() { + // Legacy method, needs to be the same name as used in filedistributor + supervisor.addMethod(new Method("waitFor", "s", "s", + this, "getFile") + .methodDesc("get path to file reference") + .paramDesc(0, "file reference", "file reference") + .returnDesc(0, "path", "path to file")); + supervisor.addMethod(new Method("filedistribution.getFile", "s", "s", + this, "getFile") + .methodDesc("get path to file reference") + .paramDesc(0, "file reference", "file reference") + .returnDesc(0, "path", "path to file")); + supervisor.addMethod(new Method("filedistribution.getActiveFileReferencesStatus", "", "SD", + this, "getActiveFileReferencesStatus") + .methodDesc("download status for file references") + .returnDesc(0, "file references", "array of file references") + .returnDesc(1, "download status", "percentage downloaded of each file reference in above array")); + supervisor.addMethod(new Method("filedistribution.setFileReferencesToDownload", "S", "i", + this, "setFileReferencesToDownload") + .methodDesc("set which file references to download") + .paramDesc(0, "file references", "file reference to download") + .returnDesc(0, "ret", "0 if success, 1 otherwise")); + supervisor.addMethod(new Method("filedistribution.receiveFile", "ssxlis", "i", // TODO Temporary method to get started with testing + this, "receiveFile") + .methodDesc("receive file reference content") + .paramDesc(0, "file references", "file reference to download") + .paramDesc(1, "filename", "filename") + .paramDesc(2, "content", "array of bytes") + .paramDesc(3, "hash", "xx64hash of the file content") + .paramDesc(4, "errorcode", "Error code. 0 if none") + .paramDesc(5, "error-description", "Error description.") + .returnDesc(0, "ret", "0 if success, 1 otherwise")); + } + //---------------- RPC methods ------------------------------------ /** @@ -207,6 +249,75 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer req.returnValues().add(new StringValue(memoryCache.dumpCacheToDisk(req.parameters().get(0).asString(), memoryCache))); } + // TODO: Duplicate of code in FileAcquirereImpl. Find out where to put it. What about C++ code using this RPC call? + private static final int baseErrorCode = 0x10000; + private static final int baseFileProviderErrorCode = baseErrorCode + 0x1000; + + private static final int fileReferenceDoesNotExists = baseFileProviderErrorCode; + private static final int fileReferenceRemoved = fileReferenceDoesNotExists + 1; + private static final int fileReferenceInternalError = fileReferenceRemoved + 1; + + @SuppressWarnings({"UnusedDeclaration"}) + public final void getFile(Request req) { + req.detach(); + FileReference fileReference = new FileReference(req.parameters().get(0).asString()); + log.log(LogLevel.DEBUG, "getFile() called for file reference '" + fileReference.value() + "'"); + Optional<File> pathToFile = proxyServer.fileDownloader().getFile(fileReference); + try { + if (pathToFile.isPresent()) { + req.returnValues().add(new StringValue(pathToFile.get().getAbsolutePath())); + log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' available at " + pathToFile.get()); + } else { + log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' not found, returning error"); + req.setError(fileReferenceDoesNotExists, "File reference '" + fileReference.value() + "' not found"); + } + } catch (Throwable e) { + log.log(LogLevel.WARNING, "File reference '" + fileReference.value() + "' got exeption: " + e.getMessage()); + req.setError(fileReferenceInternalError, "File reference '" + fileReference.value() + "' removed"); + } + req.returnRequest(); + } + + @SuppressWarnings({"UnusedDeclaration"}) + public final void getActiveFileReferencesStatus(Request req) { + Map<FileReference, Double> downloadStatus = proxyServer.fileDownloader().downloadStatus(); + + String[] fileRefArray = new String[downloadStatus.keySet().size()]; + fileRefArray = downloadStatus.keySet().stream() + .map(FileReference::value) + .collect(Collectors.toList()) + .toArray(fileRefArray); + + double[] downloadStatusArray = new double[downloadStatus.values().size()]; + int i = 0; + for (Double d : downloadStatus.values()) { + downloadStatusArray[i++] = d; + } + + req.returnValues().add(new StringArray(fileRefArray)); + req.returnValues().add(new DoubleArray(downloadStatusArray)); + } + + @SuppressWarnings({"UnusedDeclaration"}) + public final void setFileReferencesToDownload(Request req) { + String[] fileReferenceStrings = req.parameters().get(0).asStringArray(); + List<FileReference> fileReferences = Stream.of(fileReferenceStrings) + .map(FileReference::new) + .collect(Collectors.toList()); + proxyServer.fileDownloader().queueForDownload(fileReferences); + + req.returnValues().add(new Int32Value(0)); + } + + @SuppressWarnings({"UnusedDeclaration"}) + public final void receiveFile(Request req) { + FileReference fileReference = new FileReference(req.parameters().get(0).asString()); + String filename = req.parameters().get(1).asString(); + byte[] content = req.parameters().get(2).asData(); + proxyServer.fileDownloader().receiveFile(fileReference, filename, content); + req.returnValues().add(new Int32Value(0)); + } + //---------------------------------------------------- private boolean isProtocolVersionSupported(JRTServerConfigRequest request) { diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java index 274cb3ba433..5668852311f 100644 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java @@ -5,8 +5,6 @@ import com.yahoo.concurrent.DaemonThreadFactory; import com.yahoo.config.subscription.ConfigSourceSet; import com.yahoo.jrt.Spec; -import com.yahoo.jrt.Supervisor; -import com.yahoo.jrt.Transport; import com.yahoo.log.LogLevel; import com.yahoo.log.LogSetup; import com.yahoo.log.event.Event; @@ -42,7 +40,6 @@ public class ProxyServer implements Runnable { // Scheduled executor that periodically checks for requests that have timed out and response should be returned to clients private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, new DaemonThreadFactory()); - private final Supervisor supervisor = new Supervisor(new Transport()); private final ClientUpdater clientUpdater; private ScheduledFuture<?> delayedResponseScheduler; @@ -87,7 +84,6 @@ public class ProxyServer implements Runnable { clientUpdater = new ClientUpdater(rpcServer, statistics, delayedResponses); this.configClient = createClient(clientUpdater, delayedResponses, source, timingValues, memoryCache, configClient); this.fileDownloader = new FileDownloader(new JRTConnectionPool(source)); - new com.yahoo.vespa.filedistribution.RpcServer(supervisor, fileDownloader); } static ProxyServer createTestServer(ConfigSourceSet source) { @@ -166,7 +162,7 @@ public class ProxyServer implements Runnable { } private ConfigProxyRpcServer createRpcServer(Spec spec) { - return (spec == null) ? null : new ConfigProxyRpcServer(this, supervisor, spec); // TODO: Try to avoid first argument being 'this' + return (spec == null) ? null : new ConfigProxyRpcServer(this, spec); // TODO: Try to avoid first argument being 'this' } private RpcConfigSourceClient createRpcClient() { diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloader.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloader.java new file mode 100644 index 00000000000..f3c694f31ab --- /dev/null +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloader.java @@ -0,0 +1,153 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.config.proxy.filedistribution; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import com.yahoo.config.FileReference; +import com.yahoo.log.LogLevel; +import com.yahoo.vespa.config.ConnectionPool; +import com.yahoo.vespa.defaults.Defaults; +import com.yahoo.yolean.Exceptions; + +import java.io.File; +import java.time.Duration; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.logging.Logger; + +/** + * Handles downloads of files (file references only for now) + * + * @author hmusum + */ +public class FileDownloader { + + private final static Logger log = Logger.getLogger(FileDownloader.class.getName()); + + private final File downloadDirectory; + private final Duration timeout; + private final FileReferenceDownloader fileReferenceDownloader; + + public FileDownloader(ConnectionPool connectionPool) { + this(connectionPool, + new File(Defaults.getDefaults().underVespaHome("var/db/vespa/filedistribution")), + Duration.ofMinutes(15)); + } + + FileDownloader(ConnectionPool connectionPool, File downloadDirectory, Duration timeout) { + this.downloadDirectory = downloadDirectory; + this.timeout = timeout; + this.fileReferenceDownloader = new FileReferenceDownloader(downloadDirectory, connectionPool, timeout); + } + + public Optional<File> getFile(FileReference fileReference) { + Objects.requireNonNull(fileReference, "file reference cannot be null"); + File directory = new File(downloadDirectory, fileReference.value()); + log.log(LogLevel.DEBUG, "Checking if there is a file in '" + directory.getAbsolutePath() + "' "); + + Optional<File> file = getFileFromFileSystem(fileReference, directory); + if (file.isPresent()) { + return file; + } else { + log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' not found in " + + directory.getAbsolutePath() + ", starting download"); + return queueForDownload(fileReference, timeout); + } + } + + public void queueForDownload(List<FileReference> fileReferences) { + fileReferences.forEach(this::queueForDownload); + } + + public void receiveFile(FileReference fileReference, String filename, byte[] content) { + fileReferenceDownloader.receiveFile(fileReference, filename, content); + } + + double downloadStatus(FileReference fileReference) { + return fileReferenceDownloader.downloadStatus(fileReference.value()); + } + + public Map<FileReference, Double> downloadStatus() { + return fileReferenceDownloader.downloadStatus(); + } + + File downloadDirectory() { + return downloadDirectory; + } + + private Optional<File> getFileFromFileSystem(FileReference fileReference, File directory) { + File[] files = directory.listFiles(); + if (directory.exists() && directory.isDirectory() && files != null && files.length > 0) { + if (files.length != 1) { + throw new RuntimeException("More than one file in '" + fileReference.value() + + "', expected only one, unable to proceed"); + } + File file = files[0]; + if (!file.exists()) { + throw new RuntimeException("File with reference '" + fileReference.value() + + "' does not exist"); + } else if (!file.canRead()) { + throw new RuntimeException("File with reference '" + fileReference.value() + + "'exists, but unable to read it"); + } else { + fileReferenceDownloader.setDownloadStatus(fileReference.value(), 100.0); + return Optional.of(file); + } + } + return Optional.empty(); + } + + private synchronized Optional<File> queueForDownload(FileReference fileReference, Duration timeout) { + if (fileReferenceDownloader.isDownloading(fileReference)) { + log.log(LogLevel.INFO, "Already downloading '" + fileReference.value() + "'"); + ListenableFuture<Optional<File>> future = + fileReferenceDownloader.addDownloadListener(fileReference, () -> getFile(fileReference)); + try { + return future.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException("Failed downloading file reference '" + fileReference.value() + "': " + + Exceptions.toMessageString(e)); + } + } + + SettableFuture<Optional<File>> future = SettableFuture.create(); + queueForDownload(new FileReferenceDownload(fileReference, future)); + log.log(LogLevel.INFO, "Queued '" + fileReference.value() + "' for download with timeout " + timeout); + + try { + Optional<File> fileDownloaded; + try { + log.log(LogLevel.INFO, "Waiting for '" + fileReference.value() + "' to download"); + fileDownloaded = future.get(timeout.getSeconds() - 1, TimeUnit.SECONDS); + log.log(LogLevel.INFO, "'" + fileReference.value() + "' downloaded"); + } catch (TimeoutException e) { + log.log(LogLevel.WARNING, "Downloading '" + fileReference.value() + "' timed out"); + return Optional.empty(); + } + return fileDownloaded; + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException("Could not download '" + fileReference.value() + "'"); + } + } + + // We don't care about the future in this call + private synchronized void queueForDownload(FileReference fileReference) { + queueForDownload(new FileReferenceDownload(fileReference, SettableFuture.create())); + } + + private synchronized void queueForDownload(FileReferenceDownload fileReferenceDownload) { + fileReferenceDownloader.addToDownloadQueue(fileReferenceDownload); + } + + Set<FileReference> queuedDownloads() { + return fileReferenceDownloader.queuedDownloads(); + } + +} diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownload.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownload.java new file mode 100644 index 00000000000..ce5a30dc7ad --- /dev/null +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownload.java @@ -0,0 +1,28 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +package com.yahoo.vespa.config.proxy.filedistribution; + +import com.google.common.util.concurrent.SettableFuture; +import com.yahoo.config.FileReference; + +import java.io.File; +import java.util.Optional; + +public class FileReferenceDownload { + private final FileReference fileReference; + private final SettableFuture<Optional<File>> future; + + FileReferenceDownload(FileReference fileReference, SettableFuture<Optional<File>> future) { + this.fileReference = fileReference; + this.future = future; + } + + FileReference fileReference() { + return fileReference; + } + + SettableFuture<Optional<File>> future() { + return future; + } + +} diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java new file mode 100644 index 00000000000..917374740f1 --- /dev/null +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java @@ -0,0 +1,183 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.config.proxy.filedistribution; + +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.ListenableFuture; +import com.yahoo.concurrent.DaemonThreadFactory; +import com.yahoo.config.FileReference; +import com.yahoo.jrt.Request; +import com.yahoo.jrt.StringValue; +import com.yahoo.log.LogLevel; +import com.yahoo.vespa.config.Connection; +import com.yahoo.vespa.config.ConnectionPool; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.time.Duration; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.logging.Logger; +import java.util.stream.Collectors; + +/** + * Downloads file reference using rpc requests to config server and keeps track of files being downloaded + * <p> + * Some methods are synchronized to make sure access to downloads is atomic + * + * @author hmusum + */ +// TODO: Add retries when a config server does not have a file reference +// TODO: Handle shutdown of executors +class FileReferenceDownloader { + + private final static Logger log = Logger.getLogger(FileReferenceDownloader.class.getName()); + private final static Duration rpcTimeout = Duration.ofSeconds(10); + + private final File downloadDirectory; + private final ExecutorService downloadExecutor = + Executors.newFixedThreadPool(10, new DaemonThreadFactory("filereference downloader")); + private ExecutorService readFromQueueExecutor = + Executors.newFixedThreadPool(1, new DaemonThreadFactory("filereference download queue")); + private final ConnectionPool connectionPool; + private final ConcurrentLinkedQueue<FileReferenceDownload> downloadQueue = new ConcurrentLinkedQueue<>(); + private final Map<FileReference, FileReferenceDownload> downloads = new LinkedHashMap<>(); + private final Map<FileReference, Double> downloadStatus = new HashMap<>(); + private final Duration downloadTimeout; + + FileReferenceDownloader(File downloadDirectory, ConnectionPool connectionPool, Duration timeout) { + this.downloadDirectory = downloadDirectory; + this.connectionPool = connectionPool; + this.downloadTimeout = timeout; + if (connectionPool != null) + readFromQueueExecutor.submit(this::readFromQueue); + } + + private synchronized Optional<File> startDownload(FileReference fileReference, + Duration timeout, + FileReferenceDownload fileReferenceDownload) + throws ExecutionException, InterruptedException, TimeoutException { + downloads.put(fileReference, fileReferenceDownload); + setDownloadStatus(fileReference.value(), 0.0); + if (startDownloadRpc(fileReference)) + return fileReferenceDownload.future().get(timeout.toMillis(), TimeUnit.MILLISECONDS); + else { + fileReferenceDownload.future().setException(new RuntimeException("Failed getting file")); + downloads.remove(fileReference); + return Optional.empty(); + } + } + + synchronized void addToDownloadQueue(FileReferenceDownload fileReferenceDownload) { + downloadQueue.add(fileReferenceDownload); + } + + void receiveFile(FileReference fileReference, String filename, byte[] content) { + File fileReferenceDir = new File(downloadDirectory, fileReference.value()); + try { + Files.createDirectories(fileReferenceDir.toPath()); + File file = new File(fileReferenceDir, filename); + log.log(LogLevel.INFO, "Writing data to " + file.getAbsolutePath()); + Files.write(file.toPath(), content); + completedDownloading(fileReference, file); + } catch (IOException e) { + log.log(LogLevel.ERROR, "Failed writing file: " + e.getMessage()); + throw new RuntimeException("Failed writing file: ", e); + } + } + + synchronized Set<FileReference> queuedDownloads() { + return downloadQueue.stream() + .map(FileReferenceDownload::fileReference) + .collect(Collectors.toCollection(LinkedHashSet::new)); + } + + private void readFromQueue() { + do { + FileReferenceDownload fileReferenceDownload = downloadQueue.poll(); + if (fileReferenceDownload == null) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { /* ignore for now */} + } else { + log.log(LogLevel.INFO, "Polling queue, found file reference '" + + fileReferenceDownload.fileReference().value() + "' to download"); + downloadExecutor.submit(() -> startDownload(fileReferenceDownload.fileReference(), downloadTimeout, fileReferenceDownload)); + } + } while (true); + } + + private synchronized void completedDownloading(FileReference fileReference, File file) { + downloads.get(fileReference).future().set(Optional.of(file)); + downloadStatus.put(fileReference, 100.0); + } + + private boolean startDownloadRpc(FileReference fileReference) throws ExecutionException, InterruptedException { + Connection connection = connectionPool.getCurrent(); + Request request = new Request("filedistribution.serveFile"); + request.parameters().add(new StringValue(fileReference.value())); + + execute(request, connection); + if (validateResponse(request)) { + log.log(LogLevel.DEBUG, "Request callback, OK. Req: " + request + "\nSpec: " + connection); + if (request.returnValues().get(0).asInt32() == 0) + log.log(LogLevel.INFO, "Found file reference '" + fileReference.value() + "' available at " + connection.getAddress()); + else + log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' not found for " + connection.getAddress()); + return true; + } else { + log.log(LogLevel.WARNING, "Request failed. Req: " + request + "\nSpec: " + connection.getAddress()); + connection.setError(request.errorCode()); + // TODO: Retry with another config server + return false; + } + } + + synchronized boolean isDownloading(FileReference fileReference) { + return downloads.containsKey(fileReference); + } + + synchronized ListenableFuture<Optional<File>> addDownloadListener(FileReference fileReference, Runnable runnable) { + FileReferenceDownload fileReferenceDownload = downloads.get(fileReference); + fileReferenceDownload.future().addListener(runnable, downloadExecutor); + return fileReferenceDownload.future(); + } + + private void execute(Request request, Connection connection) { + connection.invokeSync(request, (double) rpcTimeout.getSeconds()); + } + + private boolean validateResponse(Request request) { + if (request.isError()) { + return false; + } else if (request.returnValues().size() == 0) { + return false; + } else if (!request.checkReturnTypes("i")) { + log.log(LogLevel.WARNING, "Invalid return types for response: " + request.errorMessage()); + return false; + } + return true; + } + + double downloadStatus(String file) { + return downloadStatus.getOrDefault(new FileReference(file), 0.0); + } + + void setDownloadStatus(String file, double percentageDownloaded) { + downloadStatus.put(new FileReference(file), percentageDownloaded); + } + + Map<FileReference, Double> downloadStatus() { + return ImmutableMap.copyOf(downloadStatus); + } +} diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServerTest.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServerTest.java index 4a9d2acb4c5..f9b334a6f87 100644 --- a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServerTest.java +++ b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServerTest.java @@ -5,8 +5,6 @@ import com.yahoo.config.subscription.ConfigSourceSet; import com.yahoo.jrt.Request; import com.yahoo.jrt.Spec; import com.yahoo.jrt.StringValue; -import com.yahoo.jrt.Supervisor; -import com.yahoo.jrt.Transport; import com.yahoo.vespa.config.RawConfig; import org.junit.After; import org.junit.Before; @@ -30,7 +28,7 @@ public class ConfigProxyRpcServerTest { @Before public void setup() { proxyServer = ProxyServer.createTestServer(new ConfigSourceSet(address)); - rpcServer = new ConfigProxyRpcServer(proxyServer, new Supervisor(new Transport()), null); + rpcServer = new ConfigProxyRpcServer(proxyServer, null); } @After @@ -42,7 +40,7 @@ public class ConfigProxyRpcServerTest { public void basic() { ProxyServer proxy = ProxyServer.createTestServer(new MockConfigSource(new MockClientUpdater())); Spec spec = new Spec("localhost", 12345); - ConfigProxyRpcServer server = new ConfigProxyRpcServer(proxy, new Supervisor(new Transport()), spec); + ConfigProxyRpcServer server = new ConfigProxyRpcServer(proxy, spec); assertThat(server.getSpec(), is(spec)); } diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java new file mode 100644 index 00000000000..18d49e9a224 --- /dev/null +++ b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java @@ -0,0 +1,229 @@ +package com.yahoo.vespa.config.proxy.filedistribution; + +import com.yahoo.config.FileReference; +import com.yahoo.io.IOUtils; +import com.yahoo.jrt.Int32Value; +import com.yahoo.jrt.Request; +import com.yahoo.jrt.RequestWaiter; +import com.yahoo.text.Utf8; +import com.yahoo.vespa.config.Connection; +import com.yahoo.vespa.config.ConnectionPool; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.time.Duration; +import java.util.Arrays; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class FileDownloaderTest { + + private MockConnection connection; + private FileDownloader fileDownloader; + + @Before + public void setup() { + try { + File downloadDir = Files.createTempDirectory("filedistribution").toFile(); + connection = new MockConnection(); + fileDownloader = new FileDownloader(connection, downloadDir, Duration.ofMillis(3000)); + } catch (IOException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void getFile() throws IOException { + File downloadDir = fileDownloader.downloadDirectory(); + + { + // fileReference already exists on disk, does not have to be downloaded + + String fileReferenceString = "foo"; + String filename = "foo.jar"; + File fileReferenceFullPath = fileReferenceFullPath(downloadDir, fileReferenceString); + FileReference fileReference = new FileReference(fileReferenceString); + writeFileReference(downloadDir, fileReferenceString, filename); + + // Check that we get correct path and content when asking for file reference + Optional<File> pathToFile = fileDownloader.getFile(fileReference); + assertTrue(pathToFile.isPresent()); + String downloadedFile = new File(fileReferenceFullPath, filename).getAbsolutePath(); + assertEquals(new File(fileReferenceFullPath, filename).getAbsolutePath(), downloadedFile); + assertEquals("content", IOUtils.readFile(pathToFile.get())); + + // Verify download status when downloaded + assertDownloadStatus(fileDownloader, fileReference, 100.0); + } + + { + // fileReference does not exist on disk, needs to be downloaded, but fails when asking upstream for file) + + connection.setResponseHandler(new MockConnection.UnknownFileReferenceResponseHandler()); + + FileReference fileReference = new FileReference("bar"); + File fileReferenceFullPath = fileReferenceFullPath(downloadDir, fileReference.value()); + assertFalse(fileReferenceFullPath.getAbsolutePath(), fileDownloader.getFile(fileReference).isPresent()); + + // Verify download status when unable to download + assertDownloadStatus(fileDownloader, fileReference, 0.0); + } + + { + // fileReference does not exist on disk, needs to be downloaded) + + FileReference fileReference = new FileReference("fileReference"); + File fileReferenceFullPath = fileReferenceFullPath(downloadDir, fileReference.value()); + assertFalse(fileReferenceFullPath.getAbsolutePath(), fileDownloader.getFile(fileReference).isPresent()); + + // Verify download status + assertDownloadStatus(fileDownloader, fileReference, 0.0); + + // Receives fileReference, should return and make it available to caller + String filename = "abc.jar"; + fileDownloader.receiveFile(fileReference, filename, Utf8.toBytes("some other content")); + Optional<File> downloadedFile = fileDownloader.getFile(fileReference); + + assertTrue(downloadedFile.isPresent()); + File downloadedFileFullPath = new File(fileReferenceFullPath, filename); + assertEquals(downloadedFileFullPath.getAbsolutePath(), downloadedFile.get().getAbsolutePath()); + assertEquals("some other content", IOUtils.readFile(downloadedFile.get())); + + // Verify download status when downloaded + assertDownloadStatus(fileDownloader, fileReference, 100.0); + } + } + + @Test + public void setFilesToDownload() throws IOException { + File downloadDir = Files.createTempDirectory("filedistribution").toFile(); + FileDownloader fileDownloader = new FileDownloader(null, downloadDir, Duration.ofMillis(200)); + FileReference foo = new FileReference("foo"); + FileReference bar = new FileReference("bar"); + List<FileReference> fileReferences = Arrays.asList(foo, bar); + fileDownloader.queueForDownload(fileReferences); + + // All requested file references should be in queue (since FileDownloader was created without ConnectionPool) + assertEquals(new LinkedHashSet<>(fileReferences), new LinkedHashSet<>(fileDownloader.queuedDownloads())); + + // Verify download status + assertDownloadStatus(fileDownloader, foo, 0.0); + assertDownloadStatus(fileDownloader, bar, 0.0); + } + + private void writeFileReference(File dir, String fileReferenceString, String fileName) throws IOException { + File file = new File(new File(dir, fileReferenceString), fileName); + IOUtils.writeFile(file, "content", false); + } + + private File fileReferenceFullPath(File dir, String fileReferenceString) { + return new File(dir, fileReferenceString); + } + + private void assertDownloadStatus(FileDownloader fileDownloader, FileReference fileReference, double expectedDownloadStatus) { + double downloadStatus = fileDownloader.downloadStatus(fileReference); + assertEquals(expectedDownloadStatus, downloadStatus, 0.0001); + } + + private static class MockConnection implements ConnectionPool, com.yahoo.vespa.config.Connection { + + private ResponseHandler responseHandler; + + MockConnection() { + this(new FileReferenceFoundResponseHandler()); + } + + MockConnection(ResponseHandler responseHandler) { + this.responseHandler = responseHandler; + } + + @Override + public void invokeAsync(Request request, double jrtTimeout, RequestWaiter requestWaiter) { + responseHandler.request(request); + } + + @Override + public void invokeSync(Request request, double jrtTimeout) { + responseHandler.request(request); + } + + @Override + public void setError(int errorCode) { + } + + @Override + public void setSuccess() { + } + + @Override + public String getAddress() { + return null; + } + + @Override + public void close() { + } + + @Override + public void setError(Connection connection, int errorCode) { + connection.setError(errorCode); + } + + @Override + public Connection getCurrent() { + return this; + } + + @Override + public Connection setNewCurrentConnection() { + return this; + } + + @Override + public int getSize() { + return 1; + } + + public void setResponseHandler(ResponseHandler responseHandler) { + this.responseHandler = responseHandler; + } + + static class FileReferenceFoundResponseHandler implements ResponseHandler { + + @Override + public void request(Request request) { + if (request.methodName().equals("filedistribution.serveFile")) + request.returnValues().add(new Int32Value(0)); + } + } + + static class UnknownFileReferenceResponseHandler implements ResponseHandler { + + @Override + public void request(Request request) { + if (request.methodName().equals("filedistribution.serveFile")) + request.returnValues().add(new Int32Value(1)); + } + } + + public interface ResponseHandler { + + void request(Request request); + + } + + } + +} |