diff options
8 files changed, 146 insertions, 66 deletions
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionRpcServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionRpcServer.java index 46b1ffc721e..b0fbc7acd33 100644 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionRpcServer.java +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionRpcServer.java @@ -59,18 +59,9 @@ public class FileDistributionRpcServer { .methodDesc("set which file references to download") .paramDesc(0, "file references", "file reference to download") .returnDesc(0, "ret", "0 if success, 1 otherwise")); - supervisor.addMethod(new Method("filedistribution.receiveFile", "ssxlis", "i", // TODO Temporary method to get started with testing - this, "receiveFile") - .methodDesc("receive file reference content") - .paramDesc(0, "file references", "file reference to download") - .paramDesc(1, "filename", "filename") - .paramDesc(2, "content", "array of bytes") - .paramDesc(3, "hash", "xx64hash of the file content") - .paramDesc(4, "errorcode", "Error code. 0 if none") - .paramDesc(5, "error-description", "Error description.") - .returnDesc(0, "ret", "0 if success, 1 otherwise")); } + //---------------- RPC methods ------------------------------------ // TODO: Duplicate of code in FileAcquirereImpl. Find out where to put it. What about C++ code using this RPC call? private static final int baseErrorCode = 0x10000; @@ -132,24 +123,5 @@ public class FileDistributionRpcServer { req.returnValues().add(new Int32Value(0)); } - @SuppressWarnings({"UnusedDeclaration"}) - public final void receiveFile(Request req) { - FileReference fileReference = new FileReference(req.parameters().get(0).asString()); - String filename = req.parameters().get(1).asString(); - byte[] content = req.parameters().get(2).asData(); - long xxhash = req.parameters().get(3).asInt64(); - int errorCode = req.parameters().get(4).asInt32(); - String errorDescription = req.parameters().get(5).asString(); - - if (errorCode == 0) { - // TODO: Remove when system test works - log.log(LogLevel.INFO, "Receiving file reference '" + fileReference.value() + "'"); - downloader.receiveFile(fileReference, filename, content); - req.returnValues().add(new Int32Value(0)); - } else { - log.log(LogLevel.WARNING, "Receiving file reference '" + fileReference.value() + "' failed: " + errorDescription); - req.returnValues().add(new Int32Value(1)); - // TODO: Add error description return value here too? - } - } + } diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReceiver.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReceiver.java new file mode 100644 index 00000000000..314c90c5853 --- /dev/null +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReceiver.java @@ -0,0 +1,87 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +package com.yahoo.vespa.config.proxy.filedistribution; + +import com.yahoo.config.FileReference; +import com.yahoo.jrt.Int32Value; +import com.yahoo.jrt.Method; +import com.yahoo.jrt.Request; +import com.yahoo.jrt.Supervisor; +import com.yahoo.log.LogLevel; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.logging.Logger; + +public class FileReceiver { + + private final static Logger log = Logger.getLogger(FileReceiver.class.getName()); + + private final Supervisor supervisor; + private final FileReferenceDownloader downloader; + private final File downloadDirectory; + + public FileReceiver(Supervisor supervisor, FileReferenceDownloader downloader, File downloadDirectory) { + this.supervisor = supervisor; + this.downloader = downloader; + this.downloadDirectory = downloadDirectory; + registerMethods(); + } + + private void registerMethods() { + supervisor.addMethod(receiveFileMethod(this)); + } + + // Defined here so that it can be added to supervisor used by client (server will use same connection when calling + // receiveFile after getting a serveFile method call). handler needs to implement receiveFile method + private Method receiveFileMethod(Object handler) { + return new Method("filedistribution.receiveFile", "ssxlis", "i", // TODO Temporary method to get started with testing + handler, "receiveFile") + .methodDesc("receive file reference content") + .paramDesc(0, "file references", "file reference to download") + .paramDesc(1, "filename", "filename") + .paramDesc(2, "content", "array of bytes") + .paramDesc(3, "hash", "xx64hash of the file content") + .paramDesc(4, "errorcode", "Error code. 0 if none") + .paramDesc(5, "error-description", "Error description.") + .returnDesc(0, "ret", "0 if success, 1 otherwise"); + } + + @SuppressWarnings({"UnusedDeclaration"}) + public final void receiveFile(Request req) { + FileReference fileReference = new FileReference(req.parameters().get(0).asString()); + String filename = req.parameters().get(1).asString(); + byte[] content = req.parameters().get(2).asData(); + long xxhash = req.parameters().get(3).asInt64(); + int errorCode = req.parameters().get(4).asInt32(); + String errorDescription = req.parameters().get(5).asString(); + + if (errorCode == 0) { + // TODO: Remove when system test works + log.log(LogLevel.INFO, "Receiving file reference '" + fileReference.value() + "'"); + receiveFile(fileReference, filename, content); + req.returnValues().add(new Int32Value(0)); + } else { + log.log(LogLevel.WARNING, "Receiving file reference '" + fileReference.value() + "' failed: " + errorDescription); + req.returnValues().add(new Int32Value(1)); + // TODO: Add error description return value here too? + } + } + + void receiveFile(FileReference fileReference, String filename, byte[] content) { + File fileReferenceDir = new File(downloadDirectory, fileReference.value()); + try { + Files.createDirectories(fileReferenceDir.toPath()); + File file = new File(fileReferenceDir, filename); + log.log(LogLevel.INFO, "Writing data to " + file.getAbsolutePath()); + Files.write(file.toPath(), content); + downloader.completedDownloading(fileReference, file); + } catch (IOException e) { + log.log(LogLevel.ERROR, "Failed writing file: " + e.getMessage()); + throw new RuntimeException("Failed writing file: ", e); + } + } + + +} diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java index 4b32ffab2b7..eaa791bec29 100644 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java @@ -12,8 +12,6 @@ import com.yahoo.vespa.config.Connection; import com.yahoo.vespa.config.ConnectionPool; import java.io.File; -import java.io.IOException; -import java.nio.file.Files; import java.time.Duration; import java.util.HashMap; import java.util.LinkedHashMap; @@ -44,7 +42,6 @@ class FileReferenceDownloader { private final static Logger log = Logger.getLogger(FileReferenceDownloader.class.getName()); private final static Duration rpcTimeout = Duration.ofSeconds(10); - private final File downloadDirectory; private final ExecutorService downloadExecutor = Executors.newFixedThreadPool(10, new DaemonThreadFactory("filereference downloader")); private ExecutorService readFromQueueExecutor = @@ -54,13 +51,13 @@ class FileReferenceDownloader { private final Map<FileReference, FileReferenceDownload> downloads = new LinkedHashMap<>(); private final Map<FileReference, Double> downloadStatus = new HashMap<>(); private final Duration downloadTimeout; + private final FileReceiver fileReceiver; FileReferenceDownloader(File downloadDirectory, ConnectionPool connectionPool, Duration timeout) { - this.downloadDirectory = downloadDirectory; this.connectionPool = connectionPool; this.downloadTimeout = timeout; - if (connectionPool != null) - readFromQueueExecutor.submit(this::readFromQueue); + readFromQueueExecutor.submit(this::readFromQueue); + this.fileReceiver = new FileReceiver(connectionPool.getSupervisor(), this, downloadDirectory); } private synchronized Optional<File> startDownload(FileReference fileReference, @@ -83,17 +80,7 @@ class FileReferenceDownloader { } void receiveFile(FileReference fileReference, String filename, byte[] content) { - File fileReferenceDir = new File(downloadDirectory, fileReference.value()); - try { - Files.createDirectories(fileReferenceDir.toPath()); - File file = new File(fileReferenceDir, filename); - log.log(LogLevel.INFO, "Writing data to " + file.getAbsolutePath()); - Files.write(file.toPath(), content); - completedDownloading(fileReference, file); - } catch (IOException e) { - log.log(LogLevel.ERROR, "Failed writing file: " + e.getMessage()); - throw new RuntimeException("Failed writing file: ", e); - } + fileReceiver.receiveFile(fileReference, filename, content); } synchronized Set<FileReference> queuedDownloads() { @@ -117,7 +104,7 @@ class FileReferenceDownloader { } while (true); } - private synchronized void completedDownloading(FileReference fileReference, File file) { + void completedDownloading(FileReference fileReference, File file) { if (downloads.containsKey(fileReference)) downloads.get(fileReference).future().set(Optional.of(file)); downloadStatus.put(fileReference, 100.0); diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java index cad3d2d0330..0959919a46d 100644 --- a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java +++ b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java @@ -19,7 +19,6 @@ import java.io.IOException; import java.nio.file.Files; import java.time.Duration; import java.util.Arrays; -import java.util.LinkedHashSet; import java.util.List; import java.util.Optional; @@ -32,14 +31,14 @@ public class FileDownloaderTest { private MockConnection connection; private FileDownloader fileDownloader; + private File downloadDir; @Before public void setup() { try { - File downloadDir = Files.createTempDirectory("filedistribution").toFile(); + downloadDir = Files.createTempDirectory("filedistribution").toFile(); connection = new MockConnection(); fileDownloader = new FileDownloader(connection, downloadDir, Duration.ofMillis(3000)); - FileDistributionRpcServer rpcServer = new FileDistributionRpcServer(new Supervisor(new Transport()), fileDownloader); } catch (IOException e) { e.printStackTrace(); fail(e.getMessage()); @@ -110,16 +109,16 @@ public class FileDownloaderTest { @Test public void setFilesToDownload() throws IOException { + Duration timeout = Duration.ofMillis(200); File downloadDir = Files.createTempDirectory("filedistribution").toFile(); - FileDownloader fileDownloader = new FileDownloader(null, downloadDir, Duration.ofMillis(200)); + MockConnection connectionPool = new MockConnection(); + connectionPool.setResponseHandler(new MockConnection.WaitResponseHandler(timeout.plus(Duration.ofMillis(1000)))); + FileDownloader fileDownloader = new FileDownloader(connectionPool, downloadDir, timeout); FileReference foo = new FileReference("foo"); FileReference bar = new FileReference("bar"); List<FileReference> fileReferences = Arrays.asList(foo, bar); fileDownloader.queueForDownload(fileReferences); - // All requested file references should be in queue (since FileDownloader was created without ConnectionPool) - assertEquals(new LinkedHashSet<>(fileReferences), new LinkedHashSet<>(fileDownloader.queuedDownloads())); - // Verify download status assertDownloadStatus(fileDownloader, foo, 0.0); assertDownloadStatus(fileDownloader, bar, 0.0); @@ -127,8 +126,6 @@ public class FileDownloaderTest { @Test public void receiveFile() throws IOException { - File downloadDir = Files.createTempDirectory("filedistribution").toFile(); - FileDownloader fileDownloader = new FileDownloader(null, downloadDir, Duration.ofMillis(200)); FileReference foo = new FileReference("foo"); String filename = "foo.jar"; fileDownloader.receiveFile(foo, filename, Utf8.toBytes("content")); @@ -209,11 +206,20 @@ public class FileDownloaderTest { return 1; } - public void setResponseHandler(ResponseHandler responseHandler) { + @Override + public Supervisor getSupervisor() { + return new Supervisor(new Transport()); + } + + void setResponseHandler(ResponseHandler responseHandler) { this.responseHandler = responseHandler; } - static class FileReferenceFoundResponseHandler implements ResponseHandler { + public interface ResponseHandler { + void request(Request request); + } + + static class FileReferenceFoundResponseHandler implements MockConnection.ResponseHandler { @Override public void request(Request request) { @@ -224,7 +230,7 @@ public class FileDownloaderTest { } } - static class UnknownFileReferenceResponseHandler implements ResponseHandler { + static class UnknownFileReferenceResponseHandler implements MockConnection.ResponseHandler { @Override public void request(Request request) { @@ -235,12 +241,26 @@ public class FileDownloaderTest { } } - public interface ResponseHandler { + static class WaitResponseHandler implements MockConnection.ResponseHandler { - void request(Request request); + private final Duration waitUntilAnswering; - } + WaitResponseHandler(Duration waitUntilAnswering) { + super(); + this.waitUntilAnswering = waitUntilAnswering; + } + + @Override + public void request(Request request) { + try { Thread.sleep(waitUntilAnswering.toMillis());} catch (InterruptedException e) { /* do nothing */ } + if (request.methodName().equals("filedistribution.serveFile")) { + request.returnValues().add(new Int32Value(0)); + request.returnValues().add(new StringValue("OK")); + } + + } + } } } diff --git a/config/src/main/java/com/yahoo/config/subscription/impl/MockConnection.java b/config/src/main/java/com/yahoo/config/subscription/impl/MockConnection.java index bd9a49c2fe2..cb623437dba 100644 --- a/config/src/main/java/com/yahoo/config/subscription/impl/MockConnection.java +++ b/config/src/main/java/com/yahoo/config/subscription/impl/MockConnection.java @@ -3,6 +3,7 @@ package com.yahoo.config.subscription.impl; import com.yahoo.jrt.Request; import com.yahoo.jrt.RequestWaiter; +import com.yahoo.jrt.Supervisor; import com.yahoo.vespa.config.ConfigPayload; import com.yahoo.vespa.config.Connection; import com.yahoo.vespa.config.ConnectionPool; @@ -96,6 +97,11 @@ public class MockConnection implements ConnectionPool, com.yahoo.vespa.config.Co return numSpecs; } + @Override + public Supervisor getSupervisor() { + return null; + } + public int getNumberOfRequests() { return numberOfRequests; } diff --git a/config/src/main/java/com/yahoo/vespa/config/ConnectionPool.java b/config/src/main/java/com/yahoo/vespa/config/ConnectionPool.java index 8ea2800e65e..5a6f8a8848b 100644 --- a/config/src/main/java/com/yahoo/vespa/config/ConnectionPool.java +++ b/config/src/main/java/com/yahoo/vespa/config/ConnectionPool.java @@ -1,6 +1,8 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.config; +import com.yahoo.jrt.Supervisor; + /** * @author hmusum */ @@ -15,4 +17,6 @@ public interface ConnectionPool { Connection setNewCurrentConnection(); int getSize(); + + Supervisor getSupervisor(); } diff --git a/config/src/main/java/com/yahoo/vespa/config/JRTConnectionPool.java b/config/src/main/java/com/yahoo/vespa/config/JRTConnectionPool.java index bb8f7e9f9ce..efeaacf225b 100644 --- a/config/src/main/java/com/yahoo/vespa/config/JRTConnectionPool.java +++ b/config/src/main/java/com/yahoo/vespa/config/JRTConnectionPool.java @@ -25,6 +25,7 @@ import java.util.logging.Logger; * @author hmusum */ public class JRTConnectionPool implements ConnectionPool { + private static final Logger log = Logger.getLogger(JRTConnectionPool.class.getName()); private final Supervisor supervisor = new Supervisor(new Transport()); @@ -150,4 +151,9 @@ public class JRTConnectionPool implements ConnectionPool { } } + @Override + public Supervisor getSupervisor() { + return supervisor; + } + } diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/ApplicationFileManager.java b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/ApplicationFileManager.java index 79c541d7b1a..0d1aae97690 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/ApplicationFileManager.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/ApplicationFileManager.java @@ -15,9 +15,7 @@ public class ApplicationFileManager implements AddFileInterface { @Override public FileReference addFile(String relativePath, FileReference reference) { - // TODO Wire in when verified in system test - // return master.addFile(new File(applicationDir, relativePath), reference); - return reference; + return master.addFile(new File(applicationDir, relativePath), reference); } @Override |