summaryrefslogtreecommitdiffstats
path: root/config-proxy/src/main/java/com
diff options
context:
space:
mode:
Diffstat (limited to 'config-proxy/src/main/java/com')
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionRpcServer.java32
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReceiver.java87
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java23
3 files changed, 94 insertions, 48 deletions
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionRpcServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionRpcServer.java
index 46b1ffc721e..b0fbc7acd33 100644
--- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionRpcServer.java
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionRpcServer.java
@@ -59,18 +59,9 @@ public class FileDistributionRpcServer {
.methodDesc("set which file references to download")
.paramDesc(0, "file references", "file reference to download")
.returnDesc(0, "ret", "0 if success, 1 otherwise"));
- supervisor.addMethod(new Method("filedistribution.receiveFile", "ssxlis", "i", // TODO Temporary method to get started with testing
- this, "receiveFile")
- .methodDesc("receive file reference content")
- .paramDesc(0, "file references", "file reference to download")
- .paramDesc(1, "filename", "filename")
- .paramDesc(2, "content", "array of bytes")
- .paramDesc(3, "hash", "xx64hash of the file content")
- .paramDesc(4, "errorcode", "Error code. 0 if none")
- .paramDesc(5, "error-description", "Error description.")
- .returnDesc(0, "ret", "0 if success, 1 otherwise"));
}
+
//---------------- RPC methods ------------------------------------
// TODO: Duplicate of code in FileAcquirereImpl. Find out where to put it. What about C++ code using this RPC call?
private static final int baseErrorCode = 0x10000;
@@ -132,24 +123,5 @@ public class FileDistributionRpcServer {
req.returnValues().add(new Int32Value(0));
}
- @SuppressWarnings({"UnusedDeclaration"})
- public final void receiveFile(Request req) {
- FileReference fileReference = new FileReference(req.parameters().get(0).asString());
- String filename = req.parameters().get(1).asString();
- byte[] content = req.parameters().get(2).asData();
- long xxhash = req.parameters().get(3).asInt64();
- int errorCode = req.parameters().get(4).asInt32();
- String errorDescription = req.parameters().get(5).asString();
-
- if (errorCode == 0) {
- // TODO: Remove when system test works
- log.log(LogLevel.INFO, "Receiving file reference '" + fileReference.value() + "'");
- downloader.receiveFile(fileReference, filename, content);
- req.returnValues().add(new Int32Value(0));
- } else {
- log.log(LogLevel.WARNING, "Receiving file reference '" + fileReference.value() + "' failed: " + errorDescription);
- req.returnValues().add(new Int32Value(1));
- // TODO: Add error description return value here too?
- }
- }
+
}
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReceiver.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReceiver.java
new file mode 100644
index 00000000000..314c90c5853
--- /dev/null
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReceiver.java
@@ -0,0 +1,87 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+package com.yahoo.vespa.config.proxy.filedistribution;
+
+import com.yahoo.config.FileReference;
+import com.yahoo.jrt.Int32Value;
+import com.yahoo.jrt.Method;
+import com.yahoo.jrt.Request;
+import com.yahoo.jrt.Supervisor;
+import com.yahoo.log.LogLevel;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.logging.Logger;
+
+public class FileReceiver {
+
+ private final static Logger log = Logger.getLogger(FileReceiver.class.getName());
+
+ private final Supervisor supervisor;
+ private final FileReferenceDownloader downloader;
+ private final File downloadDirectory;
+
+ public FileReceiver(Supervisor supervisor, FileReferenceDownloader downloader, File downloadDirectory) {
+ this.supervisor = supervisor;
+ this.downloader = downloader;
+ this.downloadDirectory = downloadDirectory;
+ registerMethods();
+ }
+
+ private void registerMethods() {
+ supervisor.addMethod(receiveFileMethod(this));
+ }
+
+ // Defined here so that it can be added to supervisor used by client (server will use same connection when calling
+ // receiveFile after getting a serveFile method call). handler needs to implement receiveFile method
+ private Method receiveFileMethod(Object handler) {
+ return new Method("filedistribution.receiveFile", "ssxlis", "i", // TODO Temporary method to get started with testing
+ handler, "receiveFile")
+ .methodDesc("receive file reference content")
+ .paramDesc(0, "file references", "file reference to download")
+ .paramDesc(1, "filename", "filename")
+ .paramDesc(2, "content", "array of bytes")
+ .paramDesc(3, "hash", "xx64hash of the file content")
+ .paramDesc(4, "errorcode", "Error code. 0 if none")
+ .paramDesc(5, "error-description", "Error description.")
+ .returnDesc(0, "ret", "0 if success, 1 otherwise");
+ }
+
+ @SuppressWarnings({"UnusedDeclaration"})
+ public final void receiveFile(Request req) {
+ FileReference fileReference = new FileReference(req.parameters().get(0).asString());
+ String filename = req.parameters().get(1).asString();
+ byte[] content = req.parameters().get(2).asData();
+ long xxhash = req.parameters().get(3).asInt64();
+ int errorCode = req.parameters().get(4).asInt32();
+ String errorDescription = req.parameters().get(5).asString();
+
+ if (errorCode == 0) {
+ // TODO: Remove when system test works
+ log.log(LogLevel.INFO, "Receiving file reference '" + fileReference.value() + "'");
+ receiveFile(fileReference, filename, content);
+ req.returnValues().add(new Int32Value(0));
+ } else {
+ log.log(LogLevel.WARNING, "Receiving file reference '" + fileReference.value() + "' failed: " + errorDescription);
+ req.returnValues().add(new Int32Value(1));
+ // TODO: Add error description return value here too?
+ }
+ }
+
+ void receiveFile(FileReference fileReference, String filename, byte[] content) {
+ File fileReferenceDir = new File(downloadDirectory, fileReference.value());
+ try {
+ Files.createDirectories(fileReferenceDir.toPath());
+ File file = new File(fileReferenceDir, filename);
+ log.log(LogLevel.INFO, "Writing data to " + file.getAbsolutePath());
+ Files.write(file.toPath(), content);
+ downloader.completedDownloading(fileReference, file);
+ } catch (IOException e) {
+ log.log(LogLevel.ERROR, "Failed writing file: " + e.getMessage());
+ throw new RuntimeException("Failed writing file: ", e);
+ }
+ }
+
+
+}
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java
index 4b32ffab2b7..eaa791bec29 100644
--- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java
@@ -12,8 +12,6 @@ import com.yahoo.vespa.config.Connection;
import com.yahoo.vespa.config.ConnectionPool;
import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
import java.time.Duration;
import java.util.HashMap;
import java.util.LinkedHashMap;
@@ -44,7 +42,6 @@ class FileReferenceDownloader {
private final static Logger log = Logger.getLogger(FileReferenceDownloader.class.getName());
private final static Duration rpcTimeout = Duration.ofSeconds(10);
- private final File downloadDirectory;
private final ExecutorService downloadExecutor =
Executors.newFixedThreadPool(10, new DaemonThreadFactory("filereference downloader"));
private ExecutorService readFromQueueExecutor =
@@ -54,13 +51,13 @@ class FileReferenceDownloader {
private final Map<FileReference, FileReferenceDownload> downloads = new LinkedHashMap<>();
private final Map<FileReference, Double> downloadStatus = new HashMap<>();
private final Duration downloadTimeout;
+ private final FileReceiver fileReceiver;
FileReferenceDownloader(File downloadDirectory, ConnectionPool connectionPool, Duration timeout) {
- this.downloadDirectory = downloadDirectory;
this.connectionPool = connectionPool;
this.downloadTimeout = timeout;
- if (connectionPool != null)
- readFromQueueExecutor.submit(this::readFromQueue);
+ readFromQueueExecutor.submit(this::readFromQueue);
+ this.fileReceiver = new FileReceiver(connectionPool.getSupervisor(), this, downloadDirectory);
}
private synchronized Optional<File> startDownload(FileReference fileReference,
@@ -83,17 +80,7 @@ class FileReferenceDownloader {
}
void receiveFile(FileReference fileReference, String filename, byte[] content) {
- File fileReferenceDir = new File(downloadDirectory, fileReference.value());
- try {
- Files.createDirectories(fileReferenceDir.toPath());
- File file = new File(fileReferenceDir, filename);
- log.log(LogLevel.INFO, "Writing data to " + file.getAbsolutePath());
- Files.write(file.toPath(), content);
- completedDownloading(fileReference, file);
- } catch (IOException e) {
- log.log(LogLevel.ERROR, "Failed writing file: " + e.getMessage());
- throw new RuntimeException("Failed writing file: ", e);
- }
+ fileReceiver.receiveFile(fileReference, filename, content);
}
synchronized Set<FileReference> queuedDownloads() {
@@ -117,7 +104,7 @@ class FileReferenceDownloader {
} while (true);
}
- private synchronized void completedDownloading(FileReference fileReference, File file) {
+ void completedDownloading(FileReference fileReference, File file) {
if (downloads.containsKey(fileReference))
downloads.get(fileReference).future().set(Optional.of(file));
downloadStatus.put(fileReference, 100.0);