diff options
Diffstat (limited to 'config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java')
-rw-r--r-- | config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java | 202 |
1 files changed, 138 insertions, 64 deletions
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java index f92e0e04974..1fced0b1e3d 100644 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.config.proxy; +import com.yahoo.config.FileReference; import com.yahoo.jrt.*; import com.yahoo.log.LogLevel; import com.yahoo.vespa.config.*; @@ -9,19 +10,20 @@ import com.yahoo.vespa.config.protocol.JRTConfigRequestFactory; import com.yahoo.vespa.config.protocol.JRTServerConfigRequest; import com.yahoo.vespa.config.protocol.JRTServerConfigRequestV3; +import java.io.File; import java.lang.*; -import java.util.Arrays; -import java.util.Iterator; -import java.util.Set; +import java.util.*; import java.util.logging.Logger; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** - * A proxy server that handles RPC config requests. + * An RPC server that handles config and file distribution requests. * * @author hmusum - * @since 5.1 */ +// TODO: Rename now that it also support file distribution request public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer { private final static Logger log = Logger.getLogger(ConfigProxyRpcServer.class.getName()); @@ -34,7 +36,8 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer ConfigProxyRpcServer(ProxyServer proxyServer, Spec spec) { this.proxyServer = proxyServer; this.spec = spec; - setUp(); + declareConfigMethods(); + declareFileDistributionMethods(); } public void run() { @@ -57,8 +60,7 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer return spec; } - private void setUp() { - // The getConfig method in this class will handle RPC calls for getting config + private void declareConfigMethods() { supervisor.addMethod(JRTMethods.createConfigV3GetConfigMethod(this, "getConfigV3")); supervisor.addMethod(new Method("ping", "", "i", this, "ping") @@ -103,6 +105,32 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer .returnDesc(0, "ret", "Empty string or error message")); } + private void declareFileDistributionMethods() { + // Legacy method, needs to be the same name as used in filedistributor + supervisor.addMethod(new Method("waitFor", "s", "s", + this, "getFile") + .methodDesc("wait for file reference") + .paramDesc(0, "file reference", "file reference") + .returnDesc(0, "path", "path to file")); + supervisor.addMethod(new Method("filedistribution.getFile", "s", "s", + this, "getFile") + .methodDesc("wait for file reference") + .paramDesc(0, "file reference", "file reference") + .returnDesc(0, "path", "path to file")); + supervisor.addMethod(new Method("filedistribution.getActiveFileReferencesStatus", "", "SD", + this, "getActiveFileReferencesStatus") + .methodDesc("download status for file references") + .returnDesc(0, "file references", "array of file references") + .returnDesc(1, "download status", "percentage downloaded of each file reference in above array")); + supervisor.addMethod(new Method("filedistribution.setFileReferencesToDownload", "S", "i", + this, "setFileReferencesToDownload") + .methodDesc("set which file references to download") + .paramDesc(0, "file references", "file reference to download") + .returnDesc(0, "ret", "0 if success, 1 otherwise")); + } + + //---------------- RPC methods ------------------------------------ + /** * Handles RPC method "config.v3.getConfig" requests. * @@ -118,54 +146,6 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer } } - private boolean isProtocolVersionSupported(JRTServerConfigRequest request) { - Set<Long> supportedProtocolVersions = JRTConfigRequestFactory.supportedProtocolVersions(); - if (supportedProtocolVersions.contains(request.getProtocolVersion())) { - return true; - } else { - String message = "Illegal protocol version " + request.getProtocolVersion() + - " in request " + request.getShortDescription() + ", only protocol versions " + supportedProtocolVersions + " are supported"; - log.log(LogLevel.ERROR, message); - request.addErrorResponse(ErrorCode.ILLEGAL_PROTOCOL_VERSION, message); - } - return false; - } - - private void preHandle(Request req) { - proxyServer.getStatistics().incRpcRequests(); - req.detach(); - req.target().addWatcher(this); - } - - /** - * Handles all versions of "getConfig" requests. - * - * @param request a Request - */ - private void getConfigImpl(JRTServerConfigRequest request) { - request.getRequestTrace().trace(TRACELEVEL, "Config proxy getConfig()"); - log.log(LogLevel.DEBUG, () ->"getConfig: " + request.getShortDescription() + ",configmd5=" + request.getRequestConfigMd5()); - if (!request.validateParameters()) { - // Error code is set in verifyParameters if parameters are not OK. - log.log(LogLevel.WARNING, "Parameters for request " + request + " did not validate: " + request.errorCode() + " : " + request.errorMessage()); - returnErrorResponse(request, request.errorCode(), "Parameters for request " + request.getShortDescription() + " did not validate: " + request.errorMessage()); - return; - } - try { - RawConfig config = proxyServer.resolveConfig(request); - if (config == null) { - log.log(LogLevel.SPAM, () -> "No config received yet for " + request.getShortDescription() + ", not sending response"); - } else if (ProxyServer.configOrGenerationHasChanged(config, request)) { - returnOkResponse(request, config); - } else { - log.log(LogLevel.SPAM, "No new config for " + request.getShortDescription() + ", not sending response"); - } - } catch (Exception e) { - e.printStackTrace(); - returnErrorResponse(request, com.yahoo.vespa.config.ErrorCode.INTERNAL_ERROR, e.getMessage()); - } - } - /** * Returns 0 if server is alive. * @@ -207,14 +187,6 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer req.returnValues().add(new StringArray(ret)); } - private String printSourceConnections() { - StringBuilder sb = new StringBuilder(); - for (String s : proxyServer.getSourceConnections()) { - sb.append(s).append("\n"); - } - return sb.toString(); - } - @SuppressWarnings({"UnusedDeclaration"}) public final void updateSources(Request req) { String sources = req.parameters().get(0).asString(); @@ -263,6 +235,108 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer req.returnValues().add(new StringValue(memoryCache.dumpCacheToDisk(req.parameters().get(0).asString(), memoryCache))); } + @SuppressWarnings({"UnusedDeclaration"}) + public final void getFile(Request req) { + // TODO: Detach to avoid holding transport thread + FileReference fileReference = new FileReference(req.parameters().get(0).asString()); + String pathToFile = proxyServer.fileDownloader() + .getFile(fileReference) + .orElseGet(() -> new File("")) + .getAbsolutePath(); + + log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' available at " + pathToFile); + req.returnValues().add(new StringValue(pathToFile)); + } + + @SuppressWarnings({"UnusedDeclaration"}) + public final void getActiveFileReferencesStatus(Request req) { + Map<FileReference, Double> downloadStatus = proxyServer.fileDownloader().downloadStatus(); + + String[] fileRefArray = new String[downloadStatus.keySet().size()]; + fileRefArray = downloadStatus.keySet().stream() + .map(FileReference::value) + .collect(Collectors.toList()) + .toArray(fileRefArray); + + double[] downloadStatusArray = new double[downloadStatus.values().size()]; + int i = 0; + for (Double d : downloadStatus.values()) { + downloadStatusArray[i++] = d; + } + + req.returnValues().add(new StringArray(fileRefArray)); + req.returnValues().add(new DoubleArray(downloadStatusArray)); + } + + @SuppressWarnings({"UnusedDeclaration"}) + public final void setFileReferencesToDownload(Request req) { + String[] fileReferenceStrings = req.parameters().get(0).asStringArray(); + List<FileReference> fileReferences = Stream.of(fileReferenceStrings) + .map(FileReference::new) + .collect(Collectors.toList()); + proxyServer.fileDownloader().queueForDownload(fileReferences); + + req.returnValues().add(new Int32Value(0)); + } + + //---------------------------------------------------- + + private boolean isProtocolVersionSupported(JRTServerConfigRequest request) { + Set<Long> supportedProtocolVersions = JRTConfigRequestFactory.supportedProtocolVersions(); + if (supportedProtocolVersions.contains(request.getProtocolVersion())) { + return true; + } else { + String message = "Illegal protocol version " + request.getProtocolVersion() + + " in request " + request.getShortDescription() + ", only protocol versions " + supportedProtocolVersions + " are supported"; + log.log(LogLevel.ERROR, message); + request.addErrorResponse(ErrorCode.ILLEGAL_PROTOCOL_VERSION, message); + } + return false; + } + + private void preHandle(Request req) { + proxyServer.getStatistics().incRpcRequests(); + req.detach(); + req.target().addWatcher(this); + } + + /** + * Handles all versions of "getConfig" requests. + * + * @param request a Request + */ + private void getConfigImpl(JRTServerConfigRequest request) { + request.getRequestTrace().trace(TRACELEVEL, "Config proxy getConfig()"); + log.log(LogLevel.DEBUG, () ->"getConfig: " + request.getShortDescription() + ",configmd5=" + request.getRequestConfigMd5()); + if (!request.validateParameters()) { + // Error code is set in verifyParameters if parameters are not OK. + log.log(LogLevel.WARNING, "Parameters for request " + request + " did not validate: " + request.errorCode() + " : " + request.errorMessage()); + returnErrorResponse(request, request.errorCode(), "Parameters for request " + request.getShortDescription() + " did not validate: " + request.errorMessage()); + return; + } + try { + RawConfig config = proxyServer.resolveConfig(request); + if (config == null) { + log.log(LogLevel.SPAM, () -> "No config received yet for " + request.getShortDescription() + ", not sending response"); + } else if (ProxyServer.configOrGenerationHasChanged(config, request)) { + returnOkResponse(request, config); + } else { + log.log(LogLevel.SPAM, "No new config for " + request.getShortDescription() + ", not sending response"); + } + } catch (Exception e) { + e.printStackTrace(); + returnErrorResponse(request, com.yahoo.vespa.config.ErrorCode.INTERNAL_ERROR, e.getMessage()); + } + } + + private String printSourceConnections() { + StringBuilder sb = new StringBuilder(); + for (String s : proxyServer.getSourceConnections()) { + sb.append(s).append("\n"); + } + return sb.toString(); + } + final void listCachedConfig(Request req, boolean full) { String[] ret; MemoryCache cache = proxyServer.getMemoryCache(); |