summaryrefslogtreecommitdiffstats
path: root/config-proxy
diff options
context:
space:
mode:
authorHarald Musum <musum@verizonmedia.com>2019-07-08 09:53:37 +0200
committerHarald Musum <musum@verizonmedia.com>2019-07-08 09:53:37 +0200
commit039f8dc3b73f4ec08ebaff79bf625590fa971006 (patch)
treed211a410d30741cbdff7fad905fbe76203ada2de /config-proxy
parent9bd9d9828b467b7b4b4b7542995fcb6ad36b6358 (diff)
Move file distribution rpc server to config-proxy
Only used in one place, just RPC stuff, so move to config-proxy module
Diffstat (limited to 'config-proxy')
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionAndUrlDownload.java3
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionRpcServer.java137
2 files changed, 137 insertions, 3 deletions
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionAndUrlDownload.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionAndUrlDownload.java
index 4eef3c40df4..0b7de6ed562 100644
--- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionAndUrlDownload.java
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionAndUrlDownload.java
@@ -4,11 +4,8 @@ package com.yahoo.vespa.config.proxy.filedistribution;
import com.yahoo.config.subscription.ConfigSourceSet;
import com.yahoo.jrt.Supervisor;
import com.yahoo.vespa.config.JRTConnectionPool;
-import com.yahoo.vespa.filedistribution.FileDistributionRpcServer;
import com.yahoo.vespa.filedistribution.FileDownloader;
-import java.util.stream.Stream;
-
/**
* Keeps track of file distribution and url download rpc servers.
*
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionRpcServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionRpcServer.java
new file mode 100644
index 00000000000..33a8ed405a9
--- /dev/null
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionRpcServer.java
@@ -0,0 +1,137 @@
+// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.config.proxy.filedistribution;
+
+import com.yahoo.concurrent.DaemonThreadFactory;
+import com.yahoo.config.FileReference;
+import com.yahoo.jrt.DoubleArray;
+import com.yahoo.jrt.Int32Value;
+import com.yahoo.jrt.Method;
+import com.yahoo.jrt.Request;
+import com.yahoo.jrt.StringArray;
+import com.yahoo.jrt.StringValue;
+import com.yahoo.jrt.Supervisor;
+import com.yahoo.log.LogLevel;
+import com.yahoo.vespa.filedistribution.FileDownloader;
+import com.yahoo.vespa.filedistribution.FileReferenceDownload;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+
+/**
+ * An RPC server that handles file distribution requests.
+ *
+ * @author hmusum
+ */
+public class FileDistributionRpcServer {
+
+ private final static Logger log = Logger.getLogger(FileDistributionRpcServer.class.getName());
+
+ private final Supervisor supervisor;
+ private final FileDownloader downloader;
+ private final ExecutorService rpcDownloadExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(),
+ new DaemonThreadFactory("Rpc executor"));
+
+ public FileDistributionRpcServer(Supervisor supervisor, FileDownloader downloader) {
+ this.supervisor = supervisor;
+ this.downloader = downloader;
+ declareFileDistributionMethods();
+ }
+
+ public void close() {
+ rpcDownloadExecutor.shutdownNow();
+ try {
+ rpcDownloadExecutor.awaitTermination(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void declareFileDistributionMethods() {
+ // Legacy method, needs to be the same name as used in filedistributor
+ supervisor.addMethod(new Method("waitFor", "s", "s", this::getFile)
+ .methodDesc("get path to file reference")
+ .paramDesc(0, "file reference", "file reference")
+ .returnDesc(0, "path", "path to file"));
+ supervisor.addMethod(new Method("filedistribution.getFile", "s", "s", this::getFile)
+ .methodDesc("get path to file reference")
+ .paramDesc(0, "file reference", "file reference")
+ .returnDesc(0, "path", "path to file"));
+ supervisor.addMethod(new Method("filedistribution.getActiveFileReferencesStatus", "", "SD", this::getActiveFileReferencesStatus)
+ .methodDesc("download status for file references")
+ .returnDesc(0, "file references", "array of file references")
+ .returnDesc(1, "download status", "percentage downloaded of each file reference in above array"));
+ supervisor.addMethod(new Method("filedistribution.setFileReferencesToDownload", "S", "i", this::setFileReferencesToDownload)
+ .methodDesc("set which file references to download")
+ .paramDesc(0, "file references", "file reference to download")
+ .returnDesc(0, "ret", "0 if success, 1 otherwise"));
+ }
+
+
+ //---------------- RPC methods ------------------------------------
+ // TODO: Duplicate of code in FileAcquirereImpl. Find out where to put it. What about C++ code using this RPC call?
+ private static final int baseErrorCode = 0x10000;
+ private static final int baseFileProviderErrorCode = baseErrorCode + 0x1000;
+
+ private static final int fileReferenceDoesNotExists = baseFileProviderErrorCode;
+ private static final int fileReferenceRemoved = fileReferenceDoesNotExists + 1;
+ private static final int fileReferenceInternalError = fileReferenceRemoved + 1;
+
+ private void getFile(Request req) {
+ req.detach();
+ rpcDownloadExecutor.execute(() -> downloadFile(req));
+ }
+
+ private void getActiveFileReferencesStatus(Request req) {
+ Map<FileReference, Double> downloadStatus = downloader.downloadStatus();
+
+ String[] fileRefArray = new String[downloadStatus.keySet().size()];
+ fileRefArray = downloadStatus.keySet().stream()
+ .map(FileReference::value)
+ .collect(Collectors.toList())
+ .toArray(fileRefArray);
+
+ double[] downloadStatusArray = new double[downloadStatus.values().size()];
+ int i = 0;
+ for (Double d : downloadStatus.values()) {
+ downloadStatusArray[i++] = d;
+ }
+
+ req.returnValues().add(new StringArray(fileRefArray));
+ req.returnValues().add(new DoubleArray(downloadStatusArray));
+ }
+
+ private void setFileReferencesToDownload(Request req) {
+ log.log(LogLevel.DEBUG, () -> "Received method call '" + req.methodName() + "' with parameters : " + req.parameters());
+ Arrays.stream(req.parameters().get(0).asStringArray())
+ .map(FileReference::new)
+ .forEach(fileReference -> downloader.downloadIfNeeded(new FileReferenceDownload(fileReference)));
+ req.returnValues().add(new Int32Value(0));
+ }
+
+ private void downloadFile(Request req) {
+ FileReference fileReference = new FileReference(req.parameters().get(0).asString());
+ log.log(LogLevel.DEBUG, () -> "getFile() called for file reference '" + fileReference.value() + "'");
+ Optional<File> pathToFile = downloader.getFile(fileReference);
+ try {
+ if (pathToFile.isPresent()) {
+ req.returnValues().add(new StringValue(pathToFile.get().getAbsolutePath()));
+ log.log(LogLevel.DEBUG, () -> "File reference '" + fileReference.value() + "' available at " + pathToFile.get());
+ } else {
+ log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' not found, returning error");
+ req.setError(fileReferenceDoesNotExists, "File reference '" + fileReference.value() + "' not found");
+ }
+ } catch (Throwable e) {
+ log.log(LogLevel.WARNING, "File reference '" + fileReference.value() + "' got exception: " + e.getMessage());
+ req.setError(fileReferenceInternalError, "File reference '" + fileReference.value() + "' removed");
+ }
+ req.returnRequest();
+ }
+
+}