aboutsummaryrefslogtreecommitdiffstats
path: root/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java
diff options
context:
space:
mode:
Diffstat (limited to 'config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java')
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java202
1 files changed, 138 insertions, 64 deletions
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java
index f92e0e04974..1fced0b1e3d 100644
--- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java
@@ -1,6 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.config.proxy;
+import com.yahoo.config.FileReference;
import com.yahoo.jrt.*;
import com.yahoo.log.LogLevel;
import com.yahoo.vespa.config.*;
@@ -9,19 +10,20 @@ import com.yahoo.vespa.config.protocol.JRTConfigRequestFactory;
import com.yahoo.vespa.config.protocol.JRTServerConfigRequest;
import com.yahoo.vespa.config.protocol.JRTServerConfigRequestV3;
+import java.io.File;
import java.lang.*;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.Set;
+import java.util.*;
import java.util.logging.Logger;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
/**
- * A proxy server that handles RPC config requests.
+ * An RPC server that handles config and file distribution requests.
*
* @author hmusum
- * @since 5.1
*/
+// TODO: Rename now that it also support file distribution request
public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer {
private final static Logger log = Logger.getLogger(ConfigProxyRpcServer.class.getName());
@@ -34,7 +36,8 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer
ConfigProxyRpcServer(ProxyServer proxyServer, Spec spec) {
this.proxyServer = proxyServer;
this.spec = spec;
- setUp();
+ declareConfigMethods();
+ declareFileDistributionMethods();
}
public void run() {
@@ -57,8 +60,7 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer
return spec;
}
- private void setUp() {
- // The getConfig method in this class will handle RPC calls for getting config
+ private void declareConfigMethods() {
supervisor.addMethod(JRTMethods.createConfigV3GetConfigMethod(this, "getConfigV3"));
supervisor.addMethod(new Method("ping", "", "i",
this, "ping")
@@ -103,6 +105,32 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer
.returnDesc(0, "ret", "Empty string or error message"));
}
+ private void declareFileDistributionMethods() {
+ // Legacy method, needs to be the same name as used in filedistributor
+ supervisor.addMethod(new Method("waitFor", "s", "s",
+ this, "getFile")
+ .methodDesc("wait for file reference")
+ .paramDesc(0, "file reference", "file reference")
+ .returnDesc(0, "path", "path to file"));
+ supervisor.addMethod(new Method("filedistribution.getFile", "s", "s",
+ this, "getFile")
+ .methodDesc("wait for file reference")
+ .paramDesc(0, "file reference", "file reference")
+ .returnDesc(0, "path", "path to file"));
+ supervisor.addMethod(new Method("filedistribution.getActiveFileReferencesStatus", "", "SD",
+ this, "getActiveFileReferencesStatus")
+ .methodDesc("download status for file references")
+ .returnDesc(0, "file references", "array of file references")
+ .returnDesc(1, "download status", "percentage downloaded of each file reference in above array"));
+ supervisor.addMethod(new Method("filedistribution.setFileReferencesToDownload", "S", "i",
+ this, "setFileReferencesToDownload")
+ .methodDesc("set which file references to download")
+ .paramDesc(0, "file references", "file reference to download")
+ .returnDesc(0, "ret", "0 if success, 1 otherwise"));
+ }
+
+ //---------------- RPC methods ------------------------------------
+
/**
* Handles RPC method "config.v3.getConfig" requests.
*
@@ -118,54 +146,6 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer
}
}
- private boolean isProtocolVersionSupported(JRTServerConfigRequest request) {
- Set<Long> supportedProtocolVersions = JRTConfigRequestFactory.supportedProtocolVersions();
- if (supportedProtocolVersions.contains(request.getProtocolVersion())) {
- return true;
- } else {
- String message = "Illegal protocol version " + request.getProtocolVersion() +
- " in request " + request.getShortDescription() + ", only protocol versions " + supportedProtocolVersions + " are supported";
- log.log(LogLevel.ERROR, message);
- request.addErrorResponse(ErrorCode.ILLEGAL_PROTOCOL_VERSION, message);
- }
- return false;
- }
-
- private void preHandle(Request req) {
- proxyServer.getStatistics().incRpcRequests();
- req.detach();
- req.target().addWatcher(this);
- }
-
- /**
- * Handles all versions of "getConfig" requests.
- *
- * @param request a Request
- */
- private void getConfigImpl(JRTServerConfigRequest request) {
- request.getRequestTrace().trace(TRACELEVEL, "Config proxy getConfig()");
- log.log(LogLevel.DEBUG, () ->"getConfig: " + request.getShortDescription() + ",configmd5=" + request.getRequestConfigMd5());
- if (!request.validateParameters()) {
- // Error code is set in verifyParameters if parameters are not OK.
- log.log(LogLevel.WARNING, "Parameters for request " + request + " did not validate: " + request.errorCode() + " : " + request.errorMessage());
- returnErrorResponse(request, request.errorCode(), "Parameters for request " + request.getShortDescription() + " did not validate: " + request.errorMessage());
- return;
- }
- try {
- RawConfig config = proxyServer.resolveConfig(request);
- if (config == null) {
- log.log(LogLevel.SPAM, () -> "No config received yet for " + request.getShortDescription() + ", not sending response");
- } else if (ProxyServer.configOrGenerationHasChanged(config, request)) {
- returnOkResponse(request, config);
- } else {
- log.log(LogLevel.SPAM, "No new config for " + request.getShortDescription() + ", not sending response");
- }
- } catch (Exception e) {
- e.printStackTrace();
- returnErrorResponse(request, com.yahoo.vespa.config.ErrorCode.INTERNAL_ERROR, e.getMessage());
- }
- }
-
/**
* Returns 0 if server is alive.
*
@@ -207,14 +187,6 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer
req.returnValues().add(new StringArray(ret));
}
- private String printSourceConnections() {
- StringBuilder sb = new StringBuilder();
- for (String s : proxyServer.getSourceConnections()) {
- sb.append(s).append("\n");
- }
- return sb.toString();
- }
-
@SuppressWarnings({"UnusedDeclaration"})
public final void updateSources(Request req) {
String sources = req.parameters().get(0).asString();
@@ -263,6 +235,108 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer
req.returnValues().add(new StringValue(memoryCache.dumpCacheToDisk(req.parameters().get(0).asString(), memoryCache)));
}
+ @SuppressWarnings({"UnusedDeclaration"})
+ public final void getFile(Request req) {
+ // TODO: Detach to avoid holding transport thread
+ FileReference fileReference = new FileReference(req.parameters().get(0).asString());
+ String pathToFile = proxyServer.fileDownloader()
+ .getFile(fileReference)
+ .orElseGet(() -> new File(""))
+ .getAbsolutePath();
+
+ log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' available at " + pathToFile);
+ req.returnValues().add(new StringValue(pathToFile));
+ }
+
+ @SuppressWarnings({"UnusedDeclaration"})
+ public final void getActiveFileReferencesStatus(Request req) {
+ Map<FileReference, Double> downloadStatus = proxyServer.fileDownloader().downloadStatus();
+
+ String[] fileRefArray = new String[downloadStatus.keySet().size()];
+ fileRefArray = downloadStatus.keySet().stream()
+ .map(FileReference::value)
+ .collect(Collectors.toList())
+ .toArray(fileRefArray);
+
+ double[] downloadStatusArray = new double[downloadStatus.values().size()];
+ int i = 0;
+ for (Double d : downloadStatus.values()) {
+ downloadStatusArray[i++] = d;
+ }
+
+ req.returnValues().add(new StringArray(fileRefArray));
+ req.returnValues().add(new DoubleArray(downloadStatusArray));
+ }
+
+ @SuppressWarnings({"UnusedDeclaration"})
+ public final void setFileReferencesToDownload(Request req) {
+ String[] fileReferenceStrings = req.parameters().get(0).asStringArray();
+ List<FileReference> fileReferences = Stream.of(fileReferenceStrings)
+ .map(FileReference::new)
+ .collect(Collectors.toList());
+ proxyServer.fileDownloader().queueForDownload(fileReferences);
+
+ req.returnValues().add(new Int32Value(0));
+ }
+
+ //----------------------------------------------------
+
+ private boolean isProtocolVersionSupported(JRTServerConfigRequest request) {
+ Set<Long> supportedProtocolVersions = JRTConfigRequestFactory.supportedProtocolVersions();
+ if (supportedProtocolVersions.contains(request.getProtocolVersion())) {
+ return true;
+ } else {
+ String message = "Illegal protocol version " + request.getProtocolVersion() +
+ " in request " + request.getShortDescription() + ", only protocol versions " + supportedProtocolVersions + " are supported";
+ log.log(LogLevel.ERROR, message);
+ request.addErrorResponse(ErrorCode.ILLEGAL_PROTOCOL_VERSION, message);
+ }
+ return false;
+ }
+
+ private void preHandle(Request req) {
+ proxyServer.getStatistics().incRpcRequests();
+ req.detach();
+ req.target().addWatcher(this);
+ }
+
+ /**
+ * Handles all versions of "getConfig" requests.
+ *
+ * @param request a Request
+ */
+ private void getConfigImpl(JRTServerConfigRequest request) {
+ request.getRequestTrace().trace(TRACELEVEL, "Config proxy getConfig()");
+ log.log(LogLevel.DEBUG, () ->"getConfig: " + request.getShortDescription() + ",configmd5=" + request.getRequestConfigMd5());
+ if (!request.validateParameters()) {
+ // Error code is set in verifyParameters if parameters are not OK.
+ log.log(LogLevel.WARNING, "Parameters for request " + request + " did not validate: " + request.errorCode() + " : " + request.errorMessage());
+ returnErrorResponse(request, request.errorCode(), "Parameters for request " + request.getShortDescription() + " did not validate: " + request.errorMessage());
+ return;
+ }
+ try {
+ RawConfig config = proxyServer.resolveConfig(request);
+ if (config == null) {
+ log.log(LogLevel.SPAM, () -> "No config received yet for " + request.getShortDescription() + ", not sending response");
+ } else if (ProxyServer.configOrGenerationHasChanged(config, request)) {
+ returnOkResponse(request, config);
+ } else {
+ log.log(LogLevel.SPAM, "No new config for " + request.getShortDescription() + ", not sending response");
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ returnErrorResponse(request, com.yahoo.vespa.config.ErrorCode.INTERNAL_ERROR, e.getMessage());
+ }
+ }
+
+ private String printSourceConnections() {
+ StringBuilder sb = new StringBuilder();
+ for (String s : proxyServer.getSourceConnections()) {
+ sb.append(s).append("\n");
+ }
+ return sb.toString();
+ }
+
final void listCachedConfig(Request req, boolean full) {
String[] ret;
MemoryCache cache = proxyServer.getMemoryCache();