diff options
author | Harald Musum <musum@oath.com> | 2017-11-02 14:46:55 +0100 |
---|---|---|
committer | Harald Musum <musum@oath.com> | 2017-11-02 14:46:55 +0100 |
commit | 0ade8429cc03ee3bee7800f6b709374c1be0929c (patch) | |
tree | 32277bb9005f7d521993c24b42e071bd9353f44d /config-proxy/src | |
parent | 648a78cd05b269b74cbb819e8d8ed9c7110e8b50 (diff) |
Add file distribution skeleton to config proxy
Diffstat (limited to 'config-proxy/src')
5 files changed, 331 insertions, 74 deletions
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java index f92e0e04974..1fced0b1e3d 100644 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.config.proxy; +import com.yahoo.config.FileReference; import com.yahoo.jrt.*; import com.yahoo.log.LogLevel; import com.yahoo.vespa.config.*; @@ -9,19 +10,20 @@ import com.yahoo.vespa.config.protocol.JRTConfigRequestFactory; import com.yahoo.vespa.config.protocol.JRTServerConfigRequest; import com.yahoo.vespa.config.protocol.JRTServerConfigRequestV3; +import java.io.File; import java.lang.*; -import java.util.Arrays; -import java.util.Iterator; -import java.util.Set; +import java.util.*; import java.util.logging.Logger; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** - * A proxy server that handles RPC config requests. + * An RPC server that handles config and file distribution requests. * * @author hmusum - * @since 5.1 */ +// TODO: Rename now that it also support file distribution request public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer { private final static Logger log = Logger.getLogger(ConfigProxyRpcServer.class.getName()); @@ -34,7 +36,8 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer ConfigProxyRpcServer(ProxyServer proxyServer, Spec spec) { this.proxyServer = proxyServer; this.spec = spec; - setUp(); + declareConfigMethods(); + declareFileDistributionMethods(); } public void run() { @@ -57,8 +60,7 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer return spec; } - private void setUp() { - // The getConfig method in this class will handle RPC calls for getting config + private void declareConfigMethods() { supervisor.addMethod(JRTMethods.createConfigV3GetConfigMethod(this, "getConfigV3")); supervisor.addMethod(new Method("ping", "", "i", this, "ping") @@ -103,6 +105,32 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer .returnDesc(0, "ret", "Empty string or error message")); } + private void declareFileDistributionMethods() { + // Legacy method, needs to be the same name as used in filedistributor + supervisor.addMethod(new Method("waitFor", "s", "s", + this, "getFile") + .methodDesc("wait for file reference") + .paramDesc(0, "file reference", "file reference") + .returnDesc(0, "path", "path to file")); + supervisor.addMethod(new Method("filedistribution.getFile", "s", "s", + this, "getFile") + .methodDesc("wait for file reference") + .paramDesc(0, "file reference", "file reference") + .returnDesc(0, "path", "path to file")); + supervisor.addMethod(new Method("filedistribution.getActiveFileReferencesStatus", "", "SD", + this, "getActiveFileReferencesStatus") + .methodDesc("download status for file references") + .returnDesc(0, "file references", "array of file references") + .returnDesc(1, "download status", "percentage downloaded of each file reference in above array")); + supervisor.addMethod(new Method("filedistribution.setFileReferencesToDownload", "S", "i", + this, "setFileReferencesToDownload") + .methodDesc("set which file references to download") + .paramDesc(0, "file references", "file reference to download") + .returnDesc(0, "ret", "0 if success, 1 otherwise")); + } + + //---------------- RPC methods ------------------------------------ + /** * Handles RPC method "config.v3.getConfig" requests. * @@ -118,54 +146,6 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer } } - private boolean isProtocolVersionSupported(JRTServerConfigRequest request) { - Set<Long> supportedProtocolVersions = JRTConfigRequestFactory.supportedProtocolVersions(); - if (supportedProtocolVersions.contains(request.getProtocolVersion())) { - return true; - } else { - String message = "Illegal protocol version " + request.getProtocolVersion() + - " in request " + request.getShortDescription() + ", only protocol versions " + supportedProtocolVersions + " are supported"; - log.log(LogLevel.ERROR, message); - request.addErrorResponse(ErrorCode.ILLEGAL_PROTOCOL_VERSION, message); - } - return false; - } - - private void preHandle(Request req) { - proxyServer.getStatistics().incRpcRequests(); - req.detach(); - req.target().addWatcher(this); - } - - /** - * Handles all versions of "getConfig" requests. - * - * @param request a Request - */ - private void getConfigImpl(JRTServerConfigRequest request) { - request.getRequestTrace().trace(TRACELEVEL, "Config proxy getConfig()"); - log.log(LogLevel.DEBUG, () ->"getConfig: " + request.getShortDescription() + ",configmd5=" + request.getRequestConfigMd5()); - if (!request.validateParameters()) { - // Error code is set in verifyParameters if parameters are not OK. - log.log(LogLevel.WARNING, "Parameters for request " + request + " did not validate: " + request.errorCode() + " : " + request.errorMessage()); - returnErrorResponse(request, request.errorCode(), "Parameters for request " + request.getShortDescription() + " did not validate: " + request.errorMessage()); - return; - } - try { - RawConfig config = proxyServer.resolveConfig(request); - if (config == null) { - log.log(LogLevel.SPAM, () -> "No config received yet for " + request.getShortDescription() + ", not sending response"); - } else if (ProxyServer.configOrGenerationHasChanged(config, request)) { - returnOkResponse(request, config); - } else { - log.log(LogLevel.SPAM, "No new config for " + request.getShortDescription() + ", not sending response"); - } - } catch (Exception e) { - e.printStackTrace(); - returnErrorResponse(request, com.yahoo.vespa.config.ErrorCode.INTERNAL_ERROR, e.getMessage()); - } - } - /** * Returns 0 if server is alive. * @@ -207,14 +187,6 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer req.returnValues().add(new StringArray(ret)); } - private String printSourceConnections() { - StringBuilder sb = new StringBuilder(); - for (String s : proxyServer.getSourceConnections()) { - sb.append(s).append("\n"); - } - return sb.toString(); - } - @SuppressWarnings({"UnusedDeclaration"}) public final void updateSources(Request req) { String sources = req.parameters().get(0).asString(); @@ -263,6 +235,108 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer req.returnValues().add(new StringValue(memoryCache.dumpCacheToDisk(req.parameters().get(0).asString(), memoryCache))); } + @SuppressWarnings({"UnusedDeclaration"}) + public final void getFile(Request req) { + // TODO: Detach to avoid holding transport thread + FileReference fileReference = new FileReference(req.parameters().get(0).asString()); + String pathToFile = proxyServer.fileDownloader() + .getFile(fileReference) + .orElseGet(() -> new File("")) + .getAbsolutePath(); + + log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' available at " + pathToFile); + req.returnValues().add(new StringValue(pathToFile)); + } + + @SuppressWarnings({"UnusedDeclaration"}) + public final void getActiveFileReferencesStatus(Request req) { + Map<FileReference, Double> downloadStatus = proxyServer.fileDownloader().downloadStatus(); + + String[] fileRefArray = new String[downloadStatus.keySet().size()]; + fileRefArray = downloadStatus.keySet().stream() + .map(FileReference::value) + .collect(Collectors.toList()) + .toArray(fileRefArray); + + double[] downloadStatusArray = new double[downloadStatus.values().size()]; + int i = 0; + for (Double d : downloadStatus.values()) { + downloadStatusArray[i++] = d; + } + + req.returnValues().add(new StringArray(fileRefArray)); + req.returnValues().add(new DoubleArray(downloadStatusArray)); + } + + @SuppressWarnings({"UnusedDeclaration"}) + public final void setFileReferencesToDownload(Request req) { + String[] fileReferenceStrings = req.parameters().get(0).asStringArray(); + List<FileReference> fileReferences = Stream.of(fileReferenceStrings) + .map(FileReference::new) + .collect(Collectors.toList()); + proxyServer.fileDownloader().queueForDownload(fileReferences); + + req.returnValues().add(new Int32Value(0)); + } + + //---------------------------------------------------- + + private boolean isProtocolVersionSupported(JRTServerConfigRequest request) { + Set<Long> supportedProtocolVersions = JRTConfigRequestFactory.supportedProtocolVersions(); + if (supportedProtocolVersions.contains(request.getProtocolVersion())) { + return true; + } else { + String message = "Illegal protocol version " + request.getProtocolVersion() + + " in request " + request.getShortDescription() + ", only protocol versions " + supportedProtocolVersions + " are supported"; + log.log(LogLevel.ERROR, message); + request.addErrorResponse(ErrorCode.ILLEGAL_PROTOCOL_VERSION, message); + } + return false; + } + + private void preHandle(Request req) { + proxyServer.getStatistics().incRpcRequests(); + req.detach(); + req.target().addWatcher(this); + } + + /** + * Handles all versions of "getConfig" requests. + * + * @param request a Request + */ + private void getConfigImpl(JRTServerConfigRequest request) { + request.getRequestTrace().trace(TRACELEVEL, "Config proxy getConfig()"); + log.log(LogLevel.DEBUG, () ->"getConfig: " + request.getShortDescription() + ",configmd5=" + request.getRequestConfigMd5()); + if (!request.validateParameters()) { + // Error code is set in verifyParameters if parameters are not OK. + log.log(LogLevel.WARNING, "Parameters for request " + request + " did not validate: " + request.errorCode() + " : " + request.errorMessage()); + returnErrorResponse(request, request.errorCode(), "Parameters for request " + request.getShortDescription() + " did not validate: " + request.errorMessage()); + return; + } + try { + RawConfig config = proxyServer.resolveConfig(request); + if (config == null) { + log.log(LogLevel.SPAM, () -> "No config received yet for " + request.getShortDescription() + ", not sending response"); + } else if (ProxyServer.configOrGenerationHasChanged(config, request)) { + returnOkResponse(request, config); + } else { + log.log(LogLevel.SPAM, "No new config for " + request.getShortDescription() + ", not sending response"); + } + } catch (Exception e) { + e.printStackTrace(); + returnErrorResponse(request, com.yahoo.vespa.config.ErrorCode.INTERNAL_ERROR, e.getMessage()); + } + } + + private String printSourceConnections() { + StringBuilder sb = new StringBuilder(); + for (String s : proxyServer.getSourceConnections()) { + sb.append(s).append("\n"); + } + return sb.toString(); + } + final void listCachedConfig(Request req, boolean full) { String[] ret; MemoryCache cache = proxyServer.getMemoryCache(); diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java index 74f32f6d052..4ee77beb2d7 100644 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java @@ -2,7 +2,6 @@ package com.yahoo.vespa.config.proxy; import com.yahoo.concurrent.DaemonThreadFactory; -import com.yahoo.config.subscription.ConfigSource; import com.yahoo.config.subscription.ConfigSourceSet; import com.yahoo.jrt.Spec; @@ -12,6 +11,7 @@ import com.yahoo.log.event.Event; import com.yahoo.system.CatchSigTerm; import com.yahoo.vespa.config.*; import com.yahoo.vespa.config.protocol.JRTServerConfigRequest; +import com.yahoo.vespa.config.proxy.filedistribution.FileDownloader; import java.util.List; import java.util.concurrent.Executors; @@ -45,7 +45,7 @@ public class ProxyServer implements Runnable { private final ConfigProxyRpcServer rpcServer; final DelayedResponses delayedResponses; - private ConfigSource configSource; + private ConfigSourceSet configSource; private volatile ConfigSourceClient configClient; @@ -55,6 +55,7 @@ public class ProxyServer implements Runnable { private static final double timingValuesRatio = 0.8; private final static TimingValues defaultTimingValues; private final boolean delayedResponseHandling; + private final FileDownloader fileDownloader; private volatile Mode mode = new Mode(DEFAULT); @@ -68,7 +69,7 @@ public class ProxyServer implements Runnable { defaultTimingValues = tv; } - private ProxyServer(Spec spec, DelayedResponses delayedResponses, ConfigSource source, + private ProxyServer(Spec spec, DelayedResponses delayedResponses, ConfigSourceSet source, ConfigProxyStatistics statistics, TimingValues timingValues, boolean delayedResponseHandling, MemoryCache memoryCache, ConfigSourceClient configClient) { @@ -82,17 +83,17 @@ public class ProxyServer implements Runnable { this.rpcServer = createRpcServer(spec); clientUpdater = new ClientUpdater(rpcServer, statistics, delayedResponses); this.configClient = createClient(clientUpdater, delayedResponses, source, timingValues, memoryCache, configClient); + this.fileDownloader = new FileDownloader(source); } static ProxyServer createTestServer(ConfigSourceSet source) { return createTestServer(source, null, new MemoryCache(), new ConfigProxyStatistics()); } - static ProxyServer createTestServer(ConfigSource source, + static ProxyServer createTestServer(ConfigSourceSet source, ConfigSourceClient configSourceClient, MemoryCache memoryCache, - ConfigProxyStatistics statistics) - { + ConfigProxyStatistics statistics) { final boolean delayedResponseHandling = false; return new ProxyServer(null, new DelayedResponses(statistics), source, statistics, defaultTimingValues(), delayedResponseHandling, @@ -153,10 +154,10 @@ public class ProxyServer implements Runnable { } private ConfigSourceClient createClient(ClientUpdater clientUpdater, DelayedResponses delayedResponses, - Object source, TimingValues timingValues, + ConfigSourceSet source, TimingValues timingValues, MemoryCache memoryCache, ConfigSourceClient client) { return (client == null) - ? new RpcConfigSourceClient((ConfigSourceSet) source, clientUpdater, memoryCache, timingValues, delayedResponses) + ? new RpcConfigSourceClient(source, clientUpdater, memoryCache, timingValues, delayedResponses) : client; } @@ -165,7 +166,7 @@ public class ProxyServer implements Runnable { } private RpcConfigSourceClient createRpcClient() { - return new RpcConfigSourceClient((ConfigSourceSet) configSource, clientUpdater, memoryCache, timingValues, delayedResponses); + return new RpcConfigSourceClient(configSource, clientUpdater, memoryCache, timingValues, delayedResponses); } private void setupSigTermHandler() { @@ -279,4 +280,8 @@ public class ProxyServer implements Runnable { flush(); configClient = createRpcClient(); } + + FileDownloader fileDownloader() { + return fileDownloader; + } } diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloader.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloader.java new file mode 100644 index 00000000000..9074527e4e4 --- /dev/null +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloader.java @@ -0,0 +1,98 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.config.proxy.filedistribution; + +import com.google.common.collect.ImmutableSet; +import com.yahoo.config.FileReference; +import com.yahoo.config.subscription.ConfigSourceSet; +import com.yahoo.log.LogLevel; +import com.yahoo.vespa.defaults.Defaults; + +import java.io.File; +import java.time.Duration; +import java.time.Instant; +import java.util.*; +import java.util.logging.Logger; + +/** + * Keeps track of files to download and download status + * + * @author hmusum + */ +public class FileDownloader { + private final static Logger log = Logger.getLogger(FileDownloader.class.getName()); + + + private final String filesDirectory; + private final ConfigSourceSet configSourceSet; + private final Duration timeout; + private final Map<FileReference, Double> downloadStatus = new HashMap<>(); + private final Set<FileReference> queuedForDownload = new LinkedHashSet<>(); + + public FileDownloader(ConfigSourceSet configSourceSet) { + this(configSourceSet, + Defaults.getDefaults().underVespaHome("var/db/vespa/filedistribution"), + Duration.ofMinutes(15)); + } + + FileDownloader(ConfigSourceSet configSourceSet, String filesDirectory, Duration timeout) { + this.configSourceSet = configSourceSet; + this.filesDirectory = filesDirectory; + this.timeout = timeout; + } + + public Optional<File> getFile(FileReference fileReference) { + Objects.requireNonNull(fileReference, "file reference cannot be null"); + File directory = new File(filesDirectory, fileReference.value()); // directory with one file + + log.log(LogLevel.DEBUG, "Checking if there is a file in '" + directory.getAbsolutePath() + "' "); + Instant end = Instant.now().plus(timeout); + do { + File[] files = directory.listFiles(); + if (directory.exists() && directory.isDirectory() && files != null && files.length > 0) { + if (files.length != 1) { + throw new RuntimeException("More than one file in '" + fileReference.value() + + "', expected only one, unable to proceed"); + } + File file = files[0]; + if (!file.exists()) { + throw new RuntimeException("File with reference '" + fileReference.value() + + "' does not exist"); + } else if (!file.canRead()) { + throw new RuntimeException("File with reference '" + fileReference.value() + + "'exists, but unable to read it"); + } else { + downloadStatus.put(fileReference, 100.0); + return Optional.of(file); + } + } else { + queueForDownload(fileReference); + } + try { + Thread.sleep(100); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } while (Instant.now().isBefore(end)); + + return Optional.empty(); + } + + public Map<FileReference, Double> downloadStatus() { + return downloadStatus; + } + + public void queueForDownload(List<FileReference> fileReferences) { + fileReferences.forEach(this::queueForDownload); + } + + private void queueForDownload(FileReference fileReference) { + log.log(LogLevel.INFO, "Queued '" + fileReference.value() + "' for download "); + queuedForDownload.add(fileReference); + downloadStatus.put(fileReference, 0.0); + } + + ImmutableSet<FileReference> queuedForDownload() { + return ImmutableSet.copyOf(queuedForDownload); + } + +}
\ No newline at end of file diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockConfigSource.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockConfigSource.java index 52de4d1942a..dc9a3408510 100644 --- a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockConfigSource.java +++ b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockConfigSource.java @@ -2,6 +2,7 @@ package com.yahoo.vespa.config.proxy; import com.yahoo.config.subscription.ConfigSource; +import com.yahoo.config.subscription.ConfigSourceSet; import com.yahoo.vespa.config.ConfigKey; import com.yahoo.vespa.config.RawConfig; @@ -14,7 +15,7 @@ import java.util.HashMap; * @author hmusum * @since 5.1.10 */ -class MockConfigSource implements ConfigSource { +class MockConfigSource extends ConfigSourceSet { private final HashMap<ConfigKey<?>, RawConfig> backing = new HashMap<>(); private final ClientUpdater clientUpdater; diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java new file mode 100644 index 00000000000..ea880e451b6 --- /dev/null +++ b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java @@ -0,0 +1,79 @@ +package com.yahoo.vespa.config.proxy.filedistribution; + +import com.yahoo.config.FileReference; +import com.yahoo.config.subscription.ConfigSourceSet; +import com.yahoo.io.IOUtils; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.time.Duration; +import java.util.*; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class FileDownloaderTest { + private static final ConfigSourceSet configSourceSet = new ConfigSourceSet(); + + @Test + public void download() throws IOException { + File downloadDir = Files.createTempDirectory("filedistribution").toFile(); + FileDownloader fileDownloader = new FileDownloader(configSourceSet, downloadDir.getAbsolutePath(), Duration.ofMillis(200)); + + // Write a file to download directory to simulate download going OK + String fileReferenceString = "somehash"; + String fileName = "foo.jar"; + File fileReferenceFullPath = fileReferenceFullPath(downloadDir, fileReferenceString); + FileReference fileReference = writeFileReference(downloadDir, fileReferenceString, fileName); + + // Check that we get correct path and content when asking for file reference + Optional<File> pathToFile = fileDownloader.getFile(fileReference); + assertTrue(pathToFile.isPresent()); + String downloadedFile = new File(fileReferenceFullPath, fileName).getAbsolutePath(); + assertEquals(new File(fileReferenceFullPath, fileName).getAbsolutePath(), downloadedFile); + assertEquals("content", IOUtils.readFile(pathToFile.get())); + + // Verify download status + Map<FileReference, Double> downloadStatus = fileDownloader.downloadStatus(); + assertEquals(1, downloadStatus.size()); + assertDownloadStatus(Collections.singletonList(fileReference), downloadStatus.entrySet().iterator().next(), 100.0); + + // Non-existing file + assertFalse(fileReferenceFullPath.getAbsolutePath(), fileDownloader.getFile(new FileReference("doesnotexist")).isPresent()); + } + + @Test + public void setFilesToDownload() throws IOException { + File downloadDir = Files.createTempDirectory("filedistribution").toFile(); + FileDownloader fileDownloader = new FileDownloader(configSourceSet, downloadDir.getAbsolutePath(), Duration.ofMillis(200)); + List<FileReference> fileReferences = Arrays.asList(new FileReference("foo"), new FileReference("bar")); + fileDownloader.queueForDownload(fileReferences); + + assertEquals(fileReferences, fileDownloader.queuedForDownload().asList()); + + // Verify download status + Map<FileReference, Double> downloadStatus = fileDownloader.downloadStatus(); + assertEquals(2, downloadStatus.size()); + + assertDownloadStatus(fileReferences, downloadStatus.entrySet().iterator().next(), 0.0); + assertDownloadStatus(fileReferences, downloadStatus.entrySet().iterator().next(), 0.0); + } + + private FileReference writeFileReference(File dir, String fileReferenceString, String fileName) throws IOException { + File file = new File(new File(dir, fileReferenceString), fileName); + IOUtils.writeFile(file, "content", false); + return new FileReference(fileReferenceString); + } + + private File fileReferenceFullPath(File dir, String fileReferenceString) { + return new File(dir, fileReferenceString); + } + + private void assertDownloadStatus(List<FileReference> fileReferences, Map.Entry<FileReference, Double> entry, double expectedDownloadStatus) { + assertTrue(fileReferences.contains(new FileReference(entry.getKey().value()))); + assertEquals(expectedDownloadStatus, entry.getValue(), 0.0001); + } +} |