summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionRpcServer.java32
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReceiver.java87
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java23
-rw-r--r--config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java50
-rw-r--r--config/src/main/java/com/yahoo/config/subscription/impl/MockConnection.java6
-rw-r--r--config/src/main/java/com/yahoo/vespa/config/ConnectionPool.java4
-rw-r--r--config/src/main/java/com/yahoo/vespa/config/JRTConnectionPool.java6
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/ApplicationFileManager.java4
8 files changed, 146 insertions, 66 deletions
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionRpcServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionRpcServer.java
index 46b1ffc721e..b0fbc7acd33 100644
--- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionRpcServer.java
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionRpcServer.java
@@ -59,18 +59,9 @@ public class FileDistributionRpcServer {
.methodDesc("set which file references to download")
.paramDesc(0, "file references", "file reference to download")
.returnDesc(0, "ret", "0 if success, 1 otherwise"));
- supervisor.addMethod(new Method("filedistribution.receiveFile", "ssxlis", "i", // TODO Temporary method to get started with testing
- this, "receiveFile")
- .methodDesc("receive file reference content")
- .paramDesc(0, "file references", "file reference to download")
- .paramDesc(1, "filename", "filename")
- .paramDesc(2, "content", "array of bytes")
- .paramDesc(3, "hash", "xx64hash of the file content")
- .paramDesc(4, "errorcode", "Error code. 0 if none")
- .paramDesc(5, "error-description", "Error description.")
- .returnDesc(0, "ret", "0 if success, 1 otherwise"));
}
+
//---------------- RPC methods ------------------------------------
// TODO: Duplicate of code in FileAcquirereImpl. Find out where to put it. What about C++ code using this RPC call?
private static final int baseErrorCode = 0x10000;
@@ -132,24 +123,5 @@ public class FileDistributionRpcServer {
req.returnValues().add(new Int32Value(0));
}
- @SuppressWarnings({"UnusedDeclaration"})
- public final void receiveFile(Request req) {
- FileReference fileReference = new FileReference(req.parameters().get(0).asString());
- String filename = req.parameters().get(1).asString();
- byte[] content = req.parameters().get(2).asData();
- long xxhash = req.parameters().get(3).asInt64();
- int errorCode = req.parameters().get(4).asInt32();
- String errorDescription = req.parameters().get(5).asString();
-
- if (errorCode == 0) {
- // TODO: Remove when system test works
- log.log(LogLevel.INFO, "Receiving file reference '" + fileReference.value() + "'");
- downloader.receiveFile(fileReference, filename, content);
- req.returnValues().add(new Int32Value(0));
- } else {
- log.log(LogLevel.WARNING, "Receiving file reference '" + fileReference.value() + "' failed: " + errorDescription);
- req.returnValues().add(new Int32Value(1));
- // TODO: Add error description return value here too?
- }
- }
+
}
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReceiver.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReceiver.java
new file mode 100644
index 00000000000..314c90c5853
--- /dev/null
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReceiver.java
@@ -0,0 +1,87 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+package com.yahoo.vespa.config.proxy.filedistribution;
+
+import com.yahoo.config.FileReference;
+import com.yahoo.jrt.Int32Value;
+import com.yahoo.jrt.Method;
+import com.yahoo.jrt.Request;
+import com.yahoo.jrt.Supervisor;
+import com.yahoo.log.LogLevel;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.logging.Logger;
+
+public class FileReceiver {
+
+ private final static Logger log = Logger.getLogger(FileReceiver.class.getName());
+
+ private final Supervisor supervisor;
+ private final FileReferenceDownloader downloader;
+ private final File downloadDirectory;
+
+ public FileReceiver(Supervisor supervisor, FileReferenceDownloader downloader, File downloadDirectory) {
+ this.supervisor = supervisor;
+ this.downloader = downloader;
+ this.downloadDirectory = downloadDirectory;
+ registerMethods();
+ }
+
+ private void registerMethods() {
+ supervisor.addMethod(receiveFileMethod(this));
+ }
+
+ // Defined here so that it can be added to supervisor used by client (server will use same connection when calling
+ // receiveFile after getting a serveFile method call). handler needs to implement receiveFile method
+ private Method receiveFileMethod(Object handler) {
+ return new Method("filedistribution.receiveFile", "ssxlis", "i", // TODO Temporary method to get started with testing
+ handler, "receiveFile")
+ .methodDesc("receive file reference content")
+ .paramDesc(0, "file references", "file reference to download")
+ .paramDesc(1, "filename", "filename")
+ .paramDesc(2, "content", "array of bytes")
+ .paramDesc(3, "hash", "xx64hash of the file content")
+ .paramDesc(4, "errorcode", "Error code. 0 if none")
+ .paramDesc(5, "error-description", "Error description.")
+ .returnDesc(0, "ret", "0 if success, 1 otherwise");
+ }
+
+ @SuppressWarnings({"UnusedDeclaration"})
+ public final void receiveFile(Request req) {
+ FileReference fileReference = new FileReference(req.parameters().get(0).asString());
+ String filename = req.parameters().get(1).asString();
+ byte[] content = req.parameters().get(2).asData();
+ long xxhash = req.parameters().get(3).asInt64();
+ int errorCode = req.parameters().get(4).asInt32();
+ String errorDescription = req.parameters().get(5).asString();
+
+ if (errorCode == 0) {
+ // TODO: Remove when system test works
+ log.log(LogLevel.INFO, "Receiving file reference '" + fileReference.value() + "'");
+ receiveFile(fileReference, filename, content);
+ req.returnValues().add(new Int32Value(0));
+ } else {
+ log.log(LogLevel.WARNING, "Receiving file reference '" + fileReference.value() + "' failed: " + errorDescription);
+ req.returnValues().add(new Int32Value(1));
+ // TODO: Add error description return value here too?
+ }
+ }
+
+ void receiveFile(FileReference fileReference, String filename, byte[] content) {
+ File fileReferenceDir = new File(downloadDirectory, fileReference.value());
+ try {
+ Files.createDirectories(fileReferenceDir.toPath());
+ File file = new File(fileReferenceDir, filename);
+ log.log(LogLevel.INFO, "Writing data to " + file.getAbsolutePath());
+ Files.write(file.toPath(), content);
+ downloader.completedDownloading(fileReference, file);
+ } catch (IOException e) {
+ log.log(LogLevel.ERROR, "Failed writing file: " + e.getMessage());
+ throw new RuntimeException("Failed writing file: ", e);
+ }
+ }
+
+
+}
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java
index 4b32ffab2b7..eaa791bec29 100644
--- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java
@@ -12,8 +12,6 @@ import com.yahoo.vespa.config.Connection;
import com.yahoo.vespa.config.ConnectionPool;
import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
import java.time.Duration;
import java.util.HashMap;
import java.util.LinkedHashMap;
@@ -44,7 +42,6 @@ class FileReferenceDownloader {
private final static Logger log = Logger.getLogger(FileReferenceDownloader.class.getName());
private final static Duration rpcTimeout = Duration.ofSeconds(10);
- private final File downloadDirectory;
private final ExecutorService downloadExecutor =
Executors.newFixedThreadPool(10, new DaemonThreadFactory("filereference downloader"));
private ExecutorService readFromQueueExecutor =
@@ -54,13 +51,13 @@ class FileReferenceDownloader {
private final Map<FileReference, FileReferenceDownload> downloads = new LinkedHashMap<>();
private final Map<FileReference, Double> downloadStatus = new HashMap<>();
private final Duration downloadTimeout;
+ private final FileReceiver fileReceiver;
FileReferenceDownloader(File downloadDirectory, ConnectionPool connectionPool, Duration timeout) {
- this.downloadDirectory = downloadDirectory;
this.connectionPool = connectionPool;
this.downloadTimeout = timeout;
- if (connectionPool != null)
- readFromQueueExecutor.submit(this::readFromQueue);
+ readFromQueueExecutor.submit(this::readFromQueue);
+ this.fileReceiver = new FileReceiver(connectionPool.getSupervisor(), this, downloadDirectory);
}
private synchronized Optional<File> startDownload(FileReference fileReference,
@@ -83,17 +80,7 @@ class FileReferenceDownloader {
}
void receiveFile(FileReference fileReference, String filename, byte[] content) {
- File fileReferenceDir = new File(downloadDirectory, fileReference.value());
- try {
- Files.createDirectories(fileReferenceDir.toPath());
- File file = new File(fileReferenceDir, filename);
- log.log(LogLevel.INFO, "Writing data to " + file.getAbsolutePath());
- Files.write(file.toPath(), content);
- completedDownloading(fileReference, file);
- } catch (IOException e) {
- log.log(LogLevel.ERROR, "Failed writing file: " + e.getMessage());
- throw new RuntimeException("Failed writing file: ", e);
- }
+ fileReceiver.receiveFile(fileReference, filename, content);
}
synchronized Set<FileReference> queuedDownloads() {
@@ -117,7 +104,7 @@ class FileReferenceDownloader {
} while (true);
}
- private synchronized void completedDownloading(FileReference fileReference, File file) {
+ void completedDownloading(FileReference fileReference, File file) {
if (downloads.containsKey(fileReference))
downloads.get(fileReference).future().set(Optional.of(file));
downloadStatus.put(fileReference, 100.0);
diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java
index cad3d2d0330..0959919a46d 100644
--- a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java
+++ b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java
@@ -19,7 +19,6 @@ import java.io.IOException;
import java.nio.file.Files;
import java.time.Duration;
import java.util.Arrays;
-import java.util.LinkedHashSet;
import java.util.List;
import java.util.Optional;
@@ -32,14 +31,14 @@ public class FileDownloaderTest {
private MockConnection connection;
private FileDownloader fileDownloader;
+ private File downloadDir;
@Before
public void setup() {
try {
- File downloadDir = Files.createTempDirectory("filedistribution").toFile();
+ downloadDir = Files.createTempDirectory("filedistribution").toFile();
connection = new MockConnection();
fileDownloader = new FileDownloader(connection, downloadDir, Duration.ofMillis(3000));
- FileDistributionRpcServer rpcServer = new FileDistributionRpcServer(new Supervisor(new Transport()), fileDownloader);
} catch (IOException e) {
e.printStackTrace();
fail(e.getMessage());
@@ -110,16 +109,16 @@ public class FileDownloaderTest {
@Test
public void setFilesToDownload() throws IOException {
+ Duration timeout = Duration.ofMillis(200);
File downloadDir = Files.createTempDirectory("filedistribution").toFile();
- FileDownloader fileDownloader = new FileDownloader(null, downloadDir, Duration.ofMillis(200));
+ MockConnection connectionPool = new MockConnection();
+ connectionPool.setResponseHandler(new MockConnection.WaitResponseHandler(timeout.plus(Duration.ofMillis(1000))));
+ FileDownloader fileDownloader = new FileDownloader(connectionPool, downloadDir, timeout);
FileReference foo = new FileReference("foo");
FileReference bar = new FileReference("bar");
List<FileReference> fileReferences = Arrays.asList(foo, bar);
fileDownloader.queueForDownload(fileReferences);
- // All requested file references should be in queue (since FileDownloader was created without ConnectionPool)
- assertEquals(new LinkedHashSet<>(fileReferences), new LinkedHashSet<>(fileDownloader.queuedDownloads()));
-
// Verify download status
assertDownloadStatus(fileDownloader, foo, 0.0);
assertDownloadStatus(fileDownloader, bar, 0.0);
@@ -127,8 +126,6 @@ public class FileDownloaderTest {
@Test
public void receiveFile() throws IOException {
- File downloadDir = Files.createTempDirectory("filedistribution").toFile();
- FileDownloader fileDownloader = new FileDownloader(null, downloadDir, Duration.ofMillis(200));
FileReference foo = new FileReference("foo");
String filename = "foo.jar";
fileDownloader.receiveFile(foo, filename, Utf8.toBytes("content"));
@@ -209,11 +206,20 @@ public class FileDownloaderTest {
return 1;
}
- public void setResponseHandler(ResponseHandler responseHandler) {
+ @Override
+ public Supervisor getSupervisor() {
+ return new Supervisor(new Transport());
+ }
+
+ void setResponseHandler(ResponseHandler responseHandler) {
this.responseHandler = responseHandler;
}
- static class FileReferenceFoundResponseHandler implements ResponseHandler {
+ public interface ResponseHandler {
+ void request(Request request);
+ }
+
+ static class FileReferenceFoundResponseHandler implements MockConnection.ResponseHandler {
@Override
public void request(Request request) {
@@ -224,7 +230,7 @@ public class FileDownloaderTest {
}
}
- static class UnknownFileReferenceResponseHandler implements ResponseHandler {
+ static class UnknownFileReferenceResponseHandler implements MockConnection.ResponseHandler {
@Override
public void request(Request request) {
@@ -235,12 +241,26 @@ public class FileDownloaderTest {
}
}
- public interface ResponseHandler {
+ static class WaitResponseHandler implements MockConnection.ResponseHandler {
- void request(Request request);
+ private final Duration waitUntilAnswering;
- }
+ WaitResponseHandler(Duration waitUntilAnswering) {
+ super();
+ this.waitUntilAnswering = waitUntilAnswering;
+ }
+
+ @Override
+ public void request(Request request) {
+ try { Thread.sleep(waitUntilAnswering.toMillis());} catch (InterruptedException e) { /* do nothing */ }
+ if (request.methodName().equals("filedistribution.serveFile")) {
+ request.returnValues().add(new Int32Value(0));
+ request.returnValues().add(new StringValue("OK"));
+ }
+
+ }
+ }
}
}
diff --git a/config/src/main/java/com/yahoo/config/subscription/impl/MockConnection.java b/config/src/main/java/com/yahoo/config/subscription/impl/MockConnection.java
index bd9a49c2fe2..cb623437dba 100644
--- a/config/src/main/java/com/yahoo/config/subscription/impl/MockConnection.java
+++ b/config/src/main/java/com/yahoo/config/subscription/impl/MockConnection.java
@@ -3,6 +3,7 @@ package com.yahoo.config.subscription.impl;
import com.yahoo.jrt.Request;
import com.yahoo.jrt.RequestWaiter;
+import com.yahoo.jrt.Supervisor;
import com.yahoo.vespa.config.ConfigPayload;
import com.yahoo.vespa.config.Connection;
import com.yahoo.vespa.config.ConnectionPool;
@@ -96,6 +97,11 @@ public class MockConnection implements ConnectionPool, com.yahoo.vespa.config.Co
return numSpecs;
}
+ @Override
+ public Supervisor getSupervisor() {
+ return null;
+ }
+
public int getNumberOfRequests() {
return numberOfRequests;
}
diff --git a/config/src/main/java/com/yahoo/vespa/config/ConnectionPool.java b/config/src/main/java/com/yahoo/vespa/config/ConnectionPool.java
index 8ea2800e65e..5a6f8a8848b 100644
--- a/config/src/main/java/com/yahoo/vespa/config/ConnectionPool.java
+++ b/config/src/main/java/com/yahoo/vespa/config/ConnectionPool.java
@@ -1,6 +1,8 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.config;
+import com.yahoo.jrt.Supervisor;
+
/**
* @author hmusum
*/
@@ -15,4 +17,6 @@ public interface ConnectionPool {
Connection setNewCurrentConnection();
int getSize();
+
+ Supervisor getSupervisor();
}
diff --git a/config/src/main/java/com/yahoo/vespa/config/JRTConnectionPool.java b/config/src/main/java/com/yahoo/vespa/config/JRTConnectionPool.java
index bb8f7e9f9ce..efeaacf225b 100644
--- a/config/src/main/java/com/yahoo/vespa/config/JRTConnectionPool.java
+++ b/config/src/main/java/com/yahoo/vespa/config/JRTConnectionPool.java
@@ -25,6 +25,7 @@ import java.util.logging.Logger;
* @author hmusum
*/
public class JRTConnectionPool implements ConnectionPool {
+
private static final Logger log = Logger.getLogger(JRTConnectionPool.class.getName());
private final Supervisor supervisor = new Supervisor(new Transport());
@@ -150,4 +151,9 @@ public class JRTConnectionPool implements ConnectionPool {
}
}
+ @Override
+ public Supervisor getSupervisor() {
+ return supervisor;
+ }
+
}
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/ApplicationFileManager.java b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/ApplicationFileManager.java
index 79c541d7b1a..0d1aae97690 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/ApplicationFileManager.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/ApplicationFileManager.java
@@ -15,9 +15,7 @@ public class ApplicationFileManager implements AddFileInterface {
@Override
public FileReference addFile(String relativePath, FileReference reference) {
- // TODO Wire in when verified in system test
- // return master.addFile(new File(applicationDir, relativePath), reference);
- return reference;
+ return master.addFile(new File(applicationDir, relativePath), reference);
}
@Override