summaryrefslogtreecommitdiffstats
path: root/config-proxy
diff options
context:
space:
mode:
authorHarald Musum <musum@oath.com>2017-11-14 08:19:04 +0100
committerHarald Musum <musum@oath.com>2017-11-14 08:19:04 +0100
commit6a9c6010fb290a7343285b44827cb9a7fa558bff (patch)
treea9e2346b0c7f203ca9060868279690a2aa92591b /config-proxy
parent795122be6d0676bbedf99d2413d1db2c2b64098a (diff)
Register receiveFile with the client supervisor
Since the server will use the same connection to call receiveFile after the client have called serveFile we need to register the RPC method using the same supervisor as the client uses. Also some preparations for making the code more reusable for others.
Diffstat (limited to 'config-proxy')
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionRpcServer.java32
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReceiver.java87
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java23
-rw-r--r--config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java50
4 files changed, 129 insertions, 63 deletions
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionRpcServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionRpcServer.java
index 46b1ffc721e..b0fbc7acd33 100644
--- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionRpcServer.java
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionRpcServer.java
@@ -59,18 +59,9 @@ public class FileDistributionRpcServer {
.methodDesc("set which file references to download")
.paramDesc(0, "file references", "file reference to download")
.returnDesc(0, "ret", "0 if success, 1 otherwise"));
- supervisor.addMethod(new Method("filedistribution.receiveFile", "ssxlis", "i", // TODO Temporary method to get started with testing
- this, "receiveFile")
- .methodDesc("receive file reference content")
- .paramDesc(0, "file references", "file reference to download")
- .paramDesc(1, "filename", "filename")
- .paramDesc(2, "content", "array of bytes")
- .paramDesc(3, "hash", "xx64hash of the file content")
- .paramDesc(4, "errorcode", "Error code. 0 if none")
- .paramDesc(5, "error-description", "Error description.")
- .returnDesc(0, "ret", "0 if success, 1 otherwise"));
}
+
//---------------- RPC methods ------------------------------------
// TODO: Duplicate of code in FileAcquirereImpl. Find out where to put it. What about C++ code using this RPC call?
private static final int baseErrorCode = 0x10000;
@@ -132,24 +123,5 @@ public class FileDistributionRpcServer {
req.returnValues().add(new Int32Value(0));
}
- @SuppressWarnings({"UnusedDeclaration"})
- public final void receiveFile(Request req) {
- FileReference fileReference = new FileReference(req.parameters().get(0).asString());
- String filename = req.parameters().get(1).asString();
- byte[] content = req.parameters().get(2).asData();
- long xxhash = req.parameters().get(3).asInt64();
- int errorCode = req.parameters().get(4).asInt32();
- String errorDescription = req.parameters().get(5).asString();
-
- if (errorCode == 0) {
- // TODO: Remove when system test works
- log.log(LogLevel.INFO, "Receiving file reference '" + fileReference.value() + "'");
- downloader.receiveFile(fileReference, filename, content);
- req.returnValues().add(new Int32Value(0));
- } else {
- log.log(LogLevel.WARNING, "Receiving file reference '" + fileReference.value() + "' failed: " + errorDescription);
- req.returnValues().add(new Int32Value(1));
- // TODO: Add error description return value here too?
- }
- }
+
}
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReceiver.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReceiver.java
new file mode 100644
index 00000000000..314c90c5853
--- /dev/null
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReceiver.java
@@ -0,0 +1,87 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+package com.yahoo.vespa.config.proxy.filedistribution;
+
+import com.yahoo.config.FileReference;
+import com.yahoo.jrt.Int32Value;
+import com.yahoo.jrt.Method;
+import com.yahoo.jrt.Request;
+import com.yahoo.jrt.Supervisor;
+import com.yahoo.log.LogLevel;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.logging.Logger;
+
+public class FileReceiver {
+
+ private final static Logger log = Logger.getLogger(FileReceiver.class.getName());
+
+ private final Supervisor supervisor;
+ private final FileReferenceDownloader downloader;
+ private final File downloadDirectory;
+
+ public FileReceiver(Supervisor supervisor, FileReferenceDownloader downloader, File downloadDirectory) {
+ this.supervisor = supervisor;
+ this.downloader = downloader;
+ this.downloadDirectory = downloadDirectory;
+ registerMethods();
+ }
+
+ private void registerMethods() {
+ supervisor.addMethod(receiveFileMethod(this));
+ }
+
+ // Defined here so that it can be added to supervisor used by client (server will use same connection when calling
+ // receiveFile after getting a serveFile method call). handler needs to implement receiveFile method
+ private Method receiveFileMethod(Object handler) {
+ return new Method("filedistribution.receiveFile", "ssxlis", "i", // TODO Temporary method to get started with testing
+ handler, "receiveFile")
+ .methodDesc("receive file reference content")
+ .paramDesc(0, "file references", "file reference to download")
+ .paramDesc(1, "filename", "filename")
+ .paramDesc(2, "content", "array of bytes")
+ .paramDesc(3, "hash", "xx64hash of the file content")
+ .paramDesc(4, "errorcode", "Error code. 0 if none")
+ .paramDesc(5, "error-description", "Error description.")
+ .returnDesc(0, "ret", "0 if success, 1 otherwise");
+ }
+
+ @SuppressWarnings({"UnusedDeclaration"})
+ public final void receiveFile(Request req) {
+ FileReference fileReference = new FileReference(req.parameters().get(0).asString());
+ String filename = req.parameters().get(1).asString();
+ byte[] content = req.parameters().get(2).asData();
+ long xxhash = req.parameters().get(3).asInt64();
+ int errorCode = req.parameters().get(4).asInt32();
+ String errorDescription = req.parameters().get(5).asString();
+
+ if (errorCode == 0) {
+ // TODO: Remove when system test works
+ log.log(LogLevel.INFO, "Receiving file reference '" + fileReference.value() + "'");
+ receiveFile(fileReference, filename, content);
+ req.returnValues().add(new Int32Value(0));
+ } else {
+ log.log(LogLevel.WARNING, "Receiving file reference '" + fileReference.value() + "' failed: " + errorDescription);
+ req.returnValues().add(new Int32Value(1));
+ // TODO: Add error description return value here too?
+ }
+ }
+
+ void receiveFile(FileReference fileReference, String filename, byte[] content) {
+ File fileReferenceDir = new File(downloadDirectory, fileReference.value());
+ try {
+ Files.createDirectories(fileReferenceDir.toPath());
+ File file = new File(fileReferenceDir, filename);
+ log.log(LogLevel.INFO, "Writing data to " + file.getAbsolutePath());
+ Files.write(file.toPath(), content);
+ downloader.completedDownloading(fileReference, file);
+ } catch (IOException e) {
+ log.log(LogLevel.ERROR, "Failed writing file: " + e.getMessage());
+ throw new RuntimeException("Failed writing file: ", e);
+ }
+ }
+
+
+}
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java
index 4b32ffab2b7..eaa791bec29 100644
--- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java
@@ -12,8 +12,6 @@ import com.yahoo.vespa.config.Connection;
import com.yahoo.vespa.config.ConnectionPool;
import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
import java.time.Duration;
import java.util.HashMap;
import java.util.LinkedHashMap;
@@ -44,7 +42,6 @@ class FileReferenceDownloader {
private final static Logger log = Logger.getLogger(FileReferenceDownloader.class.getName());
private final static Duration rpcTimeout = Duration.ofSeconds(10);
- private final File downloadDirectory;
private final ExecutorService downloadExecutor =
Executors.newFixedThreadPool(10, new DaemonThreadFactory("filereference downloader"));
private ExecutorService readFromQueueExecutor =
@@ -54,13 +51,13 @@ class FileReferenceDownloader {
private final Map<FileReference, FileReferenceDownload> downloads = new LinkedHashMap<>();
private final Map<FileReference, Double> downloadStatus = new HashMap<>();
private final Duration downloadTimeout;
+ private final FileReceiver fileReceiver;
FileReferenceDownloader(File downloadDirectory, ConnectionPool connectionPool, Duration timeout) {
- this.downloadDirectory = downloadDirectory;
this.connectionPool = connectionPool;
this.downloadTimeout = timeout;
- if (connectionPool != null)
- readFromQueueExecutor.submit(this::readFromQueue);
+ readFromQueueExecutor.submit(this::readFromQueue);
+ this.fileReceiver = new FileReceiver(connectionPool.getSupervisor(), this, downloadDirectory);
}
private synchronized Optional<File> startDownload(FileReference fileReference,
@@ -83,17 +80,7 @@ class FileReferenceDownloader {
}
void receiveFile(FileReference fileReference, String filename, byte[] content) {
- File fileReferenceDir = new File(downloadDirectory, fileReference.value());
- try {
- Files.createDirectories(fileReferenceDir.toPath());
- File file = new File(fileReferenceDir, filename);
- log.log(LogLevel.INFO, "Writing data to " + file.getAbsolutePath());
- Files.write(file.toPath(), content);
- completedDownloading(fileReference, file);
- } catch (IOException e) {
- log.log(LogLevel.ERROR, "Failed writing file: " + e.getMessage());
- throw new RuntimeException("Failed writing file: ", e);
- }
+ fileReceiver.receiveFile(fileReference, filename, content);
}
synchronized Set<FileReference> queuedDownloads() {
@@ -117,7 +104,7 @@ class FileReferenceDownloader {
} while (true);
}
- private synchronized void completedDownloading(FileReference fileReference, File file) {
+ void completedDownloading(FileReference fileReference, File file) {
if (downloads.containsKey(fileReference))
downloads.get(fileReference).future().set(Optional.of(file));
downloadStatus.put(fileReference, 100.0);
diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java
index cad3d2d0330..0959919a46d 100644
--- a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java
+++ b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java
@@ -19,7 +19,6 @@ import java.io.IOException;
import java.nio.file.Files;
import java.time.Duration;
import java.util.Arrays;
-import java.util.LinkedHashSet;
import java.util.List;
import java.util.Optional;
@@ -32,14 +31,14 @@ public class FileDownloaderTest {
private MockConnection connection;
private FileDownloader fileDownloader;
+ private File downloadDir;
@Before
public void setup() {
try {
- File downloadDir = Files.createTempDirectory("filedistribution").toFile();
+ downloadDir = Files.createTempDirectory("filedistribution").toFile();
connection = new MockConnection();
fileDownloader = new FileDownloader(connection, downloadDir, Duration.ofMillis(3000));
- FileDistributionRpcServer rpcServer = new FileDistributionRpcServer(new Supervisor(new Transport()), fileDownloader);
} catch (IOException e) {
e.printStackTrace();
fail(e.getMessage());
@@ -110,16 +109,16 @@ public class FileDownloaderTest {
@Test
public void setFilesToDownload() throws IOException {
+ Duration timeout = Duration.ofMillis(200);
File downloadDir = Files.createTempDirectory("filedistribution").toFile();
- FileDownloader fileDownloader = new FileDownloader(null, downloadDir, Duration.ofMillis(200));
+ MockConnection connectionPool = new MockConnection();
+ connectionPool.setResponseHandler(new MockConnection.WaitResponseHandler(timeout.plus(Duration.ofMillis(1000))));
+ FileDownloader fileDownloader = new FileDownloader(connectionPool, downloadDir, timeout);
FileReference foo = new FileReference("foo");
FileReference bar = new FileReference("bar");
List<FileReference> fileReferences = Arrays.asList(foo, bar);
fileDownloader.queueForDownload(fileReferences);
- // All requested file references should be in queue (since FileDownloader was created without ConnectionPool)
- assertEquals(new LinkedHashSet<>(fileReferences), new LinkedHashSet<>(fileDownloader.queuedDownloads()));
-
// Verify download status
assertDownloadStatus(fileDownloader, foo, 0.0);
assertDownloadStatus(fileDownloader, bar, 0.0);
@@ -127,8 +126,6 @@ public class FileDownloaderTest {
@Test
public void receiveFile() throws IOException {
- File downloadDir = Files.createTempDirectory("filedistribution").toFile();
- FileDownloader fileDownloader = new FileDownloader(null, downloadDir, Duration.ofMillis(200));
FileReference foo = new FileReference("foo");
String filename = "foo.jar";
fileDownloader.receiveFile(foo, filename, Utf8.toBytes("content"));
@@ -209,11 +206,20 @@ public class FileDownloaderTest {
return 1;
}
- public void setResponseHandler(ResponseHandler responseHandler) {
+ @Override
+ public Supervisor getSupervisor() {
+ return new Supervisor(new Transport());
+ }
+
+ void setResponseHandler(ResponseHandler responseHandler) {
this.responseHandler = responseHandler;
}
- static class FileReferenceFoundResponseHandler implements ResponseHandler {
+ public interface ResponseHandler {
+ void request(Request request);
+ }
+
+ static class FileReferenceFoundResponseHandler implements MockConnection.ResponseHandler {
@Override
public void request(Request request) {
@@ -224,7 +230,7 @@ public class FileDownloaderTest {
}
}
- static class UnknownFileReferenceResponseHandler implements ResponseHandler {
+ static class UnknownFileReferenceResponseHandler implements MockConnection.ResponseHandler {
@Override
public void request(Request request) {
@@ -235,12 +241,26 @@ public class FileDownloaderTest {
}
}
- public interface ResponseHandler {
+ static class WaitResponseHandler implements MockConnection.ResponseHandler {
- void request(Request request);
+ private final Duration waitUntilAnswering;
- }
+ WaitResponseHandler(Duration waitUntilAnswering) {
+ super();
+ this.waitUntilAnswering = waitUntilAnswering;
+ }
+
+ @Override
+ public void request(Request request) {
+ try { Thread.sleep(waitUntilAnswering.toMillis());} catch (InterruptedException e) { /* do nothing */ }
+ if (request.methodName().equals("filedistribution.serveFile")) {
+ request.returnValues().add(new Int32Value(0));
+ request.returnValues().add(new StringValue("OK"));
+ }
+
+ }
+ }
}
}