summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHarald Musum <musum@oath.com>2017-11-14 08:19:04 +0100
committerHarald Musum <musum@oath.com>2017-11-14 08:19:04 +0100
commit6a9c6010fb290a7343285b44827cb9a7fa558bff (patch)
treea9e2346b0c7f203ca9060868279690a2aa92591b
parent795122be6d0676bbedf99d2413d1db2c2b64098a (diff)
Register receiveFile with the client supervisor
Since the server will use the same connection to call receiveFile after the client have called serveFile we need to register the RPC method using the same supervisor as the client uses. Also some preparations for making the code more reusable for others.
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionRpcServer.java32
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReceiver.java87
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java23
-rw-r--r--config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java50
-rw-r--r--config/src/main/java/com/yahoo/config/subscription/impl/MockConnection.java6
-rw-r--r--config/src/main/java/com/yahoo/vespa/config/ConnectionPool.java4
-rw-r--r--config/src/main/java/com/yahoo/vespa/config/JRTConnectionPool.java6
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/ApplicationFileManager.java4
8 files changed, 146 insertions, 66 deletions
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionRpcServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionRpcServer.java
index 46b1ffc721e..b0fbc7acd33 100644
--- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionRpcServer.java
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionRpcServer.java
@@ -59,18 +59,9 @@ public class FileDistributionRpcServer {
.methodDesc("set which file references to download")
.paramDesc(0, "file references", "file reference to download")
.returnDesc(0, "ret", "0 if success, 1 otherwise"));
- supervisor.addMethod(new Method("filedistribution.receiveFile", "ssxlis", "i", // TODO Temporary method to get started with testing
- this, "receiveFile")
- .methodDesc("receive file reference content")
- .paramDesc(0, "file references", "file reference to download")
- .paramDesc(1, "filename", "filename")
- .paramDesc(2, "content", "array of bytes")
- .paramDesc(3, "hash", "xx64hash of the file content")
- .paramDesc(4, "errorcode", "Error code. 0 if none")
- .paramDesc(5, "error-description", "Error description.")
- .returnDesc(0, "ret", "0 if success, 1 otherwise"));
}
+
//---------------- RPC methods ------------------------------------
// TODO: Duplicate of code in FileAcquirereImpl. Find out where to put it. What about C++ code using this RPC call?
private static final int baseErrorCode = 0x10000;
@@ -132,24 +123,5 @@ public class FileDistributionRpcServer {
req.returnValues().add(new Int32Value(0));
}
- @SuppressWarnings({"UnusedDeclaration"})
- public final void receiveFile(Request req) {
- FileReference fileReference = new FileReference(req.parameters().get(0).asString());
- String filename = req.parameters().get(1).asString();
- byte[] content = req.parameters().get(2).asData();
- long xxhash = req.parameters().get(3).asInt64();
- int errorCode = req.parameters().get(4).asInt32();
- String errorDescription = req.parameters().get(5).asString();
-
- if (errorCode == 0) {
- // TODO: Remove when system test works
- log.log(LogLevel.INFO, "Receiving file reference '" + fileReference.value() + "'");
- downloader.receiveFile(fileReference, filename, content);
- req.returnValues().add(new Int32Value(0));
- } else {
- log.log(LogLevel.WARNING, "Receiving file reference '" + fileReference.value() + "' failed: " + errorDescription);
- req.returnValues().add(new Int32Value(1));
- // TODO: Add error description return value here too?
- }
- }
+
}
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReceiver.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReceiver.java
new file mode 100644
index 00000000000..314c90c5853
--- /dev/null
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReceiver.java
@@ -0,0 +1,87 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+package com.yahoo.vespa.config.proxy.filedistribution;
+
+import com.yahoo.config.FileReference;
+import com.yahoo.jrt.Int32Value;
+import com.yahoo.jrt.Method;
+import com.yahoo.jrt.Request;
+import com.yahoo.jrt.Supervisor;
+import com.yahoo.log.LogLevel;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.logging.Logger;
+
+public class FileReceiver {
+
+ private final static Logger log = Logger.getLogger(FileReceiver.class.getName());
+
+ private final Supervisor supervisor;
+ private final FileReferenceDownloader downloader;
+ private final File downloadDirectory;
+
+ public FileReceiver(Supervisor supervisor, FileReferenceDownloader downloader, File downloadDirectory) {
+ this.supervisor = supervisor;
+ this.downloader = downloader;
+ this.downloadDirectory = downloadDirectory;
+ registerMethods();
+ }
+
+ private void registerMethods() {
+ supervisor.addMethod(receiveFileMethod(this));
+ }
+
+ // Defined here so that it can be added to supervisor used by client (server will use same connection when calling
+ // receiveFile after getting a serveFile method call). handler needs to implement receiveFile method
+ private Method receiveFileMethod(Object handler) {
+ return new Method("filedistribution.receiveFile", "ssxlis", "i", // TODO Temporary method to get started with testing
+ handler, "receiveFile")
+ .methodDesc("receive file reference content")
+ .paramDesc(0, "file references", "file reference to download")
+ .paramDesc(1, "filename", "filename")
+ .paramDesc(2, "content", "array of bytes")
+ .paramDesc(3, "hash", "xx64hash of the file content")
+ .paramDesc(4, "errorcode", "Error code. 0 if none")
+ .paramDesc(5, "error-description", "Error description.")
+ .returnDesc(0, "ret", "0 if success, 1 otherwise");
+ }
+
+ @SuppressWarnings({"UnusedDeclaration"})
+ public final void receiveFile(Request req) {
+ FileReference fileReference = new FileReference(req.parameters().get(0).asString());
+ String filename = req.parameters().get(1).asString();
+ byte[] content = req.parameters().get(2).asData();
+ long xxhash = req.parameters().get(3).asInt64();
+ int errorCode = req.parameters().get(4).asInt32();
+ String errorDescription = req.parameters().get(5).asString();
+
+ if (errorCode == 0) {
+ // TODO: Remove when system test works
+ log.log(LogLevel.INFO, "Receiving file reference '" + fileReference.value() + "'");
+ receiveFile(fileReference, filename, content);
+ req.returnValues().add(new Int32Value(0));
+ } else {
+ log.log(LogLevel.WARNING, "Receiving file reference '" + fileReference.value() + "' failed: " + errorDescription);
+ req.returnValues().add(new Int32Value(1));
+ // TODO: Add error description return value here too?
+ }
+ }
+
+ void receiveFile(FileReference fileReference, String filename, byte[] content) {
+ File fileReferenceDir = new File(downloadDirectory, fileReference.value());
+ try {
+ Files.createDirectories(fileReferenceDir.toPath());
+ File file = new File(fileReferenceDir, filename);
+ log.log(LogLevel.INFO, "Writing data to " + file.getAbsolutePath());
+ Files.write(file.toPath(), content);
+ downloader.completedDownloading(fileReference, file);
+ } catch (IOException e) {
+ log.log(LogLevel.ERROR, "Failed writing file: " + e.getMessage());
+ throw new RuntimeException("Failed writing file: ", e);
+ }
+ }
+
+
+}
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java
index 4b32ffab2b7..eaa791bec29 100644
--- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java
@@ -12,8 +12,6 @@ import com.yahoo.vespa.config.Connection;
import com.yahoo.vespa.config.ConnectionPool;
import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
import java.time.Duration;
import java.util.HashMap;
import java.util.LinkedHashMap;
@@ -44,7 +42,6 @@ class FileReferenceDownloader {
private final static Logger log = Logger.getLogger(FileReferenceDownloader.class.getName());
private final static Duration rpcTimeout = Duration.ofSeconds(10);
- private final File downloadDirectory;
private final ExecutorService downloadExecutor =
Executors.newFixedThreadPool(10, new DaemonThreadFactory("filereference downloader"));
private ExecutorService readFromQueueExecutor =
@@ -54,13 +51,13 @@ class FileReferenceDownloader {
private final Map<FileReference, FileReferenceDownload> downloads = new LinkedHashMap<>();
private final Map<FileReference, Double> downloadStatus = new HashMap<>();
private final Duration downloadTimeout;
+ private final FileReceiver fileReceiver;
FileReferenceDownloader(File downloadDirectory, ConnectionPool connectionPool, Duration timeout) {
- this.downloadDirectory = downloadDirectory;
this.connectionPool = connectionPool;
this.downloadTimeout = timeout;
- if (connectionPool != null)
- readFromQueueExecutor.submit(this::readFromQueue);
+ readFromQueueExecutor.submit(this::readFromQueue);
+ this.fileReceiver = new FileReceiver(connectionPool.getSupervisor(), this, downloadDirectory);
}
private synchronized Optional<File> startDownload(FileReference fileReference,
@@ -83,17 +80,7 @@ class FileReferenceDownloader {
}
void receiveFile(FileReference fileReference, String filename, byte[] content) {
- File fileReferenceDir = new File(downloadDirectory, fileReference.value());
- try {
- Files.createDirectories(fileReferenceDir.toPath());
- File file = new File(fileReferenceDir, filename);
- log.log(LogLevel.INFO, "Writing data to " + file.getAbsolutePath());
- Files.write(file.toPath(), content);
- completedDownloading(fileReference, file);
- } catch (IOException e) {
- log.log(LogLevel.ERROR, "Failed writing file: " + e.getMessage());
- throw new RuntimeException("Failed writing file: ", e);
- }
+ fileReceiver.receiveFile(fileReference, filename, content);
}
synchronized Set<FileReference> queuedDownloads() {
@@ -117,7 +104,7 @@ class FileReferenceDownloader {
} while (true);
}
- private synchronized void completedDownloading(FileReference fileReference, File file) {
+ void completedDownloading(FileReference fileReference, File file) {
if (downloads.containsKey(fileReference))
downloads.get(fileReference).future().set(Optional.of(file));
downloadStatus.put(fileReference, 100.0);
diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java
index cad3d2d0330..0959919a46d 100644
--- a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java
+++ b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java
@@ -19,7 +19,6 @@ import java.io.IOException;
import java.nio.file.Files;
import java.time.Duration;
import java.util.Arrays;
-import java.util.LinkedHashSet;
import java.util.List;
import java.util.Optional;
@@ -32,14 +31,14 @@ public class FileDownloaderTest {
private MockConnection connection;
private FileDownloader fileDownloader;
+ private File downloadDir;
@Before
public void setup() {
try {
- File downloadDir = Files.createTempDirectory("filedistribution").toFile();
+ downloadDir = Files.createTempDirectory("filedistribution").toFile();
connection = new MockConnection();
fileDownloader = new FileDownloader(connection, downloadDir, Duration.ofMillis(3000));
- FileDistributionRpcServer rpcServer = new FileDistributionRpcServer(new Supervisor(new Transport()), fileDownloader);
} catch (IOException e) {
e.printStackTrace();
fail(e.getMessage());
@@ -110,16 +109,16 @@ public class FileDownloaderTest {
@Test
public void setFilesToDownload() throws IOException {
+ Duration timeout = Duration.ofMillis(200);
File downloadDir = Files.createTempDirectory("filedistribution").toFile();
- FileDownloader fileDownloader = new FileDownloader(null, downloadDir, Duration.ofMillis(200));
+ MockConnection connectionPool = new MockConnection();
+ connectionPool.setResponseHandler(new MockConnection.WaitResponseHandler(timeout.plus(Duration.ofMillis(1000))));
+ FileDownloader fileDownloader = new FileDownloader(connectionPool, downloadDir, timeout);
FileReference foo = new FileReference("foo");
FileReference bar = new FileReference("bar");
List<FileReference> fileReferences = Arrays.asList(foo, bar);
fileDownloader.queueForDownload(fileReferences);
- // All requested file references should be in queue (since FileDownloader was created without ConnectionPool)
- assertEquals(new LinkedHashSet<>(fileReferences), new LinkedHashSet<>(fileDownloader.queuedDownloads()));
-
// Verify download status
assertDownloadStatus(fileDownloader, foo, 0.0);
assertDownloadStatus(fileDownloader, bar, 0.0);
@@ -127,8 +126,6 @@ public class FileDownloaderTest {
@Test
public void receiveFile() throws IOException {
- File downloadDir = Files.createTempDirectory("filedistribution").toFile();
- FileDownloader fileDownloader = new FileDownloader(null, downloadDir, Duration.ofMillis(200));
FileReference foo = new FileReference("foo");
String filename = "foo.jar";
fileDownloader.receiveFile(foo, filename, Utf8.toBytes("content"));
@@ -209,11 +206,20 @@ public class FileDownloaderTest {
return 1;
}
- public void setResponseHandler(ResponseHandler responseHandler) {
+ @Override
+ public Supervisor getSupervisor() {
+ return new Supervisor(new Transport());
+ }
+
+ void setResponseHandler(ResponseHandler responseHandler) {
this.responseHandler = responseHandler;
}
- static class FileReferenceFoundResponseHandler implements ResponseHandler {
+ public interface ResponseHandler {
+ void request(Request request);
+ }
+
+ static class FileReferenceFoundResponseHandler implements MockConnection.ResponseHandler {
@Override
public void request(Request request) {
@@ -224,7 +230,7 @@ public class FileDownloaderTest {
}
}
- static class UnknownFileReferenceResponseHandler implements ResponseHandler {
+ static class UnknownFileReferenceResponseHandler implements MockConnection.ResponseHandler {
@Override
public void request(Request request) {
@@ -235,12 +241,26 @@ public class FileDownloaderTest {
}
}
- public interface ResponseHandler {
+ static class WaitResponseHandler implements MockConnection.ResponseHandler {
- void request(Request request);
+ private final Duration waitUntilAnswering;
- }
+ WaitResponseHandler(Duration waitUntilAnswering) {
+ super();
+ this.waitUntilAnswering = waitUntilAnswering;
+ }
+
+ @Override
+ public void request(Request request) {
+ try { Thread.sleep(waitUntilAnswering.toMillis());} catch (InterruptedException e) { /* do nothing */ }
+ if (request.methodName().equals("filedistribution.serveFile")) {
+ request.returnValues().add(new Int32Value(0));
+ request.returnValues().add(new StringValue("OK"));
+ }
+
+ }
+ }
}
}
diff --git a/config/src/main/java/com/yahoo/config/subscription/impl/MockConnection.java b/config/src/main/java/com/yahoo/config/subscription/impl/MockConnection.java
index bd9a49c2fe2..cb623437dba 100644
--- a/config/src/main/java/com/yahoo/config/subscription/impl/MockConnection.java
+++ b/config/src/main/java/com/yahoo/config/subscription/impl/MockConnection.java
@@ -3,6 +3,7 @@ package com.yahoo.config.subscription.impl;
import com.yahoo.jrt.Request;
import com.yahoo.jrt.RequestWaiter;
+import com.yahoo.jrt.Supervisor;
import com.yahoo.vespa.config.ConfigPayload;
import com.yahoo.vespa.config.Connection;
import com.yahoo.vespa.config.ConnectionPool;
@@ -96,6 +97,11 @@ public class MockConnection implements ConnectionPool, com.yahoo.vespa.config.Co
return numSpecs;
}
+ @Override
+ public Supervisor getSupervisor() {
+ return null;
+ }
+
public int getNumberOfRequests() {
return numberOfRequests;
}
diff --git a/config/src/main/java/com/yahoo/vespa/config/ConnectionPool.java b/config/src/main/java/com/yahoo/vespa/config/ConnectionPool.java
index 8ea2800e65e..5a6f8a8848b 100644
--- a/config/src/main/java/com/yahoo/vespa/config/ConnectionPool.java
+++ b/config/src/main/java/com/yahoo/vespa/config/ConnectionPool.java
@@ -1,6 +1,8 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.config;
+import com.yahoo.jrt.Supervisor;
+
/**
* @author hmusum
*/
@@ -15,4 +17,6 @@ public interface ConnectionPool {
Connection setNewCurrentConnection();
int getSize();
+
+ Supervisor getSupervisor();
}
diff --git a/config/src/main/java/com/yahoo/vespa/config/JRTConnectionPool.java b/config/src/main/java/com/yahoo/vespa/config/JRTConnectionPool.java
index bb8f7e9f9ce..efeaacf225b 100644
--- a/config/src/main/java/com/yahoo/vespa/config/JRTConnectionPool.java
+++ b/config/src/main/java/com/yahoo/vespa/config/JRTConnectionPool.java
@@ -25,6 +25,7 @@ import java.util.logging.Logger;
* @author hmusum
*/
public class JRTConnectionPool implements ConnectionPool {
+
private static final Logger log = Logger.getLogger(JRTConnectionPool.class.getName());
private final Supervisor supervisor = new Supervisor(new Transport());
@@ -150,4 +151,9 @@ public class JRTConnectionPool implements ConnectionPool {
}
}
+ @Override
+ public Supervisor getSupervisor() {
+ return supervisor;
+ }
+
}
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/ApplicationFileManager.java b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/ApplicationFileManager.java
index 79c541d7b1a..0d1aae97690 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/ApplicationFileManager.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/ApplicationFileManager.java
@@ -15,9 +15,7 @@ public class ApplicationFileManager implements AddFileInterface {
@Override
public FileReference addFile(String relativePath, FileReference reference) {
- // TODO Wire in when verified in system test
- // return master.addFile(new File(applicationDir, relativePath), reference);
- return reference;
+ return master.addFile(new File(applicationDir, relativePath), reference);
}
@Override