diff options
author | Harald Musum <musum@verizonmedia.com> | 2019-07-08 09:53:37 +0200 |
---|---|---|
committer | Harald Musum <musum@verizonmedia.com> | 2019-07-08 09:53:37 +0200 |
commit | 039f8dc3b73f4ec08ebaff79bf625590fa971006 (patch) | |
tree | d211a410d30741cbdff7fad905fbe76203ada2de /config-proxy/src/main | |
parent | 9bd9d9828b467b7b4b4b7542995fcb6ad36b6358 (diff) |
Move file distribution rpc server to config-proxy
Only used in one place, just RPC stuff, so move to config-proxy module
Diffstat (limited to 'config-proxy/src/main')
2 files changed, 137 insertions, 3 deletions
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionAndUrlDownload.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionAndUrlDownload.java index 4eef3c40df4..0b7de6ed562 100644 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionAndUrlDownload.java +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionAndUrlDownload.java @@ -4,11 +4,8 @@ package com.yahoo.vespa.config.proxy.filedistribution; import com.yahoo.config.subscription.ConfigSourceSet; import com.yahoo.jrt.Supervisor; import com.yahoo.vespa.config.JRTConnectionPool; -import com.yahoo.vespa.filedistribution.FileDistributionRpcServer; import com.yahoo.vespa.filedistribution.FileDownloader; -import java.util.stream.Stream; - /** * Keeps track of file distribution and url download rpc servers. * diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionRpcServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionRpcServer.java new file mode 100644 index 00000000000..33a8ed405a9 --- /dev/null +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionRpcServer.java @@ -0,0 +1,137 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.config.proxy.filedistribution; + +import com.yahoo.concurrent.DaemonThreadFactory; +import com.yahoo.config.FileReference; +import com.yahoo.jrt.DoubleArray; +import com.yahoo.jrt.Int32Value; +import com.yahoo.jrt.Method; +import com.yahoo.jrt.Request; +import com.yahoo.jrt.StringArray; +import com.yahoo.jrt.StringValue; +import com.yahoo.jrt.Supervisor; +import com.yahoo.log.LogLevel; +import com.yahoo.vespa.filedistribution.FileDownloader; +import com.yahoo.vespa.filedistribution.FileReferenceDownload; + +import java.io.File; +import java.util.Arrays; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; +import java.util.stream.Collectors; + +/** + * An RPC server that handles file distribution requests. + * + * @author hmusum + */ +public class FileDistributionRpcServer { + + private final static Logger log = Logger.getLogger(FileDistributionRpcServer.class.getName()); + + private final Supervisor supervisor; + private final FileDownloader downloader; + private final ExecutorService rpcDownloadExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), + new DaemonThreadFactory("Rpc executor")); + + public FileDistributionRpcServer(Supervisor supervisor, FileDownloader downloader) { + this.supervisor = supervisor; + this.downloader = downloader; + declareFileDistributionMethods(); + } + + public void close() { + rpcDownloadExecutor.shutdownNow(); + try { + rpcDownloadExecutor.awaitTermination(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + private void declareFileDistributionMethods() { + // Legacy method, needs to be the same name as used in filedistributor + supervisor.addMethod(new Method("waitFor", "s", "s", this::getFile) + .methodDesc("get path to file reference") + .paramDesc(0, "file reference", "file reference") + .returnDesc(0, "path", "path to file")); + supervisor.addMethod(new Method("filedistribution.getFile", "s", "s", this::getFile) + .methodDesc("get path to file reference") + .paramDesc(0, "file reference", "file reference") + .returnDesc(0, "path", "path to file")); + supervisor.addMethod(new Method("filedistribution.getActiveFileReferencesStatus", "", "SD", this::getActiveFileReferencesStatus) + .methodDesc("download status for file references") + .returnDesc(0, "file references", "array of file references") + .returnDesc(1, "download status", "percentage downloaded of each file reference in above array")); + supervisor.addMethod(new Method("filedistribution.setFileReferencesToDownload", "S", "i", this::setFileReferencesToDownload) + .methodDesc("set which file references to download") + .paramDesc(0, "file references", "file reference to download") + .returnDesc(0, "ret", "0 if success, 1 otherwise")); + } + + + //---------------- RPC methods ------------------------------------ + // TODO: Duplicate of code in FileAcquirereImpl. Find out where to put it. What about C++ code using this RPC call? + private static final int baseErrorCode = 0x10000; + private static final int baseFileProviderErrorCode = baseErrorCode + 0x1000; + + private static final int fileReferenceDoesNotExists = baseFileProviderErrorCode; + private static final int fileReferenceRemoved = fileReferenceDoesNotExists + 1; + private static final int fileReferenceInternalError = fileReferenceRemoved + 1; + + private void getFile(Request req) { + req.detach(); + rpcDownloadExecutor.execute(() -> downloadFile(req)); + } + + private void getActiveFileReferencesStatus(Request req) { + Map<FileReference, Double> downloadStatus = downloader.downloadStatus(); + + String[] fileRefArray = new String[downloadStatus.keySet().size()]; + fileRefArray = downloadStatus.keySet().stream() + .map(FileReference::value) + .collect(Collectors.toList()) + .toArray(fileRefArray); + + double[] downloadStatusArray = new double[downloadStatus.values().size()]; + int i = 0; + for (Double d : downloadStatus.values()) { + downloadStatusArray[i++] = d; + } + + req.returnValues().add(new StringArray(fileRefArray)); + req.returnValues().add(new DoubleArray(downloadStatusArray)); + } + + private void setFileReferencesToDownload(Request req) { + log.log(LogLevel.DEBUG, () -> "Received method call '" + req.methodName() + "' with parameters : " + req.parameters()); + Arrays.stream(req.parameters().get(0).asStringArray()) + .map(FileReference::new) + .forEach(fileReference -> downloader.downloadIfNeeded(new FileReferenceDownload(fileReference))); + req.returnValues().add(new Int32Value(0)); + } + + private void downloadFile(Request req) { + FileReference fileReference = new FileReference(req.parameters().get(0).asString()); + log.log(LogLevel.DEBUG, () -> "getFile() called for file reference '" + fileReference.value() + "'"); + Optional<File> pathToFile = downloader.getFile(fileReference); + try { + if (pathToFile.isPresent()) { + req.returnValues().add(new StringValue(pathToFile.get().getAbsolutePath())); + log.log(LogLevel.DEBUG, () -> "File reference '" + fileReference.value() + "' available at " + pathToFile.get()); + } else { + log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' not found, returning error"); + req.setError(fileReferenceDoesNotExists, "File reference '" + fileReference.value() + "' not found"); + } + } catch (Throwable e) { + log.log(LogLevel.WARNING, "File reference '" + fileReference.value() + "' got exception: " + e.getMessage()); + req.setError(fileReferenceInternalError, "File reference '" + fileReference.value() + "' removed"); + } + req.returnRequest(); + } + +} |