diff options
author | Harald Musum <musum@yahooinc.com> | 2022-07-05 09:58:47 +0200 |
---|---|---|
committer | Harald Musum <musum@yahooinc.com> | 2022-07-05 09:58:47 +0200 |
commit | 639dc28266e8b2cf08c7f93f19ccc9275888896c (patch) | |
tree | a860fa9f5b4cc26e17ab10ebc0fe30443db40e1c /configserver/src/main/java/com/yahoo/vespa/config/server | |
parent | f2802027f402d5b16f5fffb9cfc5a3d7f7401ab4 (diff) |
Support specifying compression type for file distribution requests
Add accepted compression types in file distribution requests.
Older clients (e.g. Vespa 7.x) will not have that in requests,
so fallback to gzip in that case.
Diffstat (limited to 'configserver/src/main/java/com/yahoo/vespa/config/server')
-rw-r--r-- | configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java | 35 | ||||
-rw-r--r-- | configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java | 26 |
2 files changed, 42 insertions, 19 deletions
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java index 3fe727c6f9d..f0c34e83713 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java @@ -29,6 +29,7 @@ import java.nio.file.Path; import java.time.Duration; import java.time.Instant; import java.util.List; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; @@ -36,6 +37,9 @@ import java.util.logging.Level; import java.util.logging.Logger; import static com.yahoo.vespa.config.server.filedistribution.FileDistributionUtil.getOtherConfigServersInCluster; +import static com.yahoo.vespa.filedistribution.FileReferenceData.CompressionType; +import static com.yahoo.vespa.filedistribution.FileReferenceData.CompressionType.gzip; +import static com.yahoo.vespa.filedistribution.FileReferenceData.CompressionType.lz4; import static com.yahoo.vespa.filedistribution.FileReferenceData.Type.compressed; public class FileServer { @@ -114,17 +118,14 @@ public class FileServer { FileDirectory getRootDir() { return root; } - void startFileServing(FileReference fileReference, Receiver target) { - if (root.getFile(fileReference).exists()) - serveFile(fileReference, target); - } + void startFileServing(FileReference reference, Receiver target, Set<CompressionType> acceptedCompressionTypes) { + if ( ! root.getFile(reference).exists()) return; - private void serveFile(FileReference reference, Receiver target) { File file = root.getFile(reference); log.log(Level.FINE, () -> "Start serving " + reference + " with file '" + file.getAbsolutePath() + "'"); FileReferenceData fileData = EmptyFileReferenceData.empty(reference, file.getName()); try { - fileData = readFileReferenceData(reference); + fileData = readFileReferenceData(reference, acceptedCompressionTypes); target.receive(fileData, new ReplayStatus(0, "OK")); log.log(Level.FINE, () -> "Done serving " + reference.value() + " with file '" + file.getAbsolutePath() + "'"); } catch (IOException e) { @@ -138,19 +139,23 @@ public class FileServer { } } - private FileReferenceData readFileReferenceData(FileReference reference) throws IOException { + private FileReferenceData readFileReferenceData(FileReference reference, Set<CompressionType> acceptedCompressionTypes) throws IOException { File file = root.getFile(reference); if (file.isDirectory()) { Path tempFile = Files.createTempFile("filereferencedata", reference.value()); - File compressedFile = new FileReferenceCompressor(compressed).compress(file.getParentFile(), tempFile.toFile()); + CompressionType compressionType = chooseCompressionType(acceptedCompressionTypes); + File compressedFile = new FileReferenceCompressor(compressed, compressionType).compress(file.getParentFile(), tempFile.toFile()); return new LazyTemporaryStorageFileReferenceData(reference, file.getName(), compressed, compressedFile); } else { return new LazyFileReferenceData(reference, file.getName(), FileReferenceData.Type.file, file); } } - public void serveFile(FileReference fileReference, boolean downloadFromOtherSourceIfNotFound, Request request, Receiver receiver) { + public void serveFile(FileReference fileReference, + boolean downloadFromOtherSourceIfNotFound, + Set<CompressionType> acceptedCompressionTypes, + Request request, Receiver receiver) { if (executor instanceof ThreadPoolExecutor) log.log(Level.FINE, () -> "Active threads: " + ((ThreadPoolExecutor) executor).getActiveCount()); @@ -158,7 +163,7 @@ public class FileServer { Instant deadline = Instant.now().plus(timeout); String client = request.target().toString(); executor.execute(() -> { - var result = serveFileInternal(fileReference, downloadFromOtherSourceIfNotFound, client, receiver, deadline); + var result = serveFileInternal(fileReference, downloadFromOtherSourceIfNotFound, client, receiver, deadline, acceptedCompressionTypes); request.returnValues() .add(new Int32Value(result.getCode())) .add(new StringValue(result.getDescription())); @@ -170,7 +175,8 @@ public class FileServer { boolean downloadFromOtherSourceIfNotFound, String client, Receiver receiver, - Instant deadline) { + Instant deadline, + Set<CompressionType> acceptedCompressionTypes) { if (Instant.now().isAfter(deadline)) { log.log(Level.INFO, () -> "Deadline exceeded for request for file reference '" + fileReference + "' from " + client); return FileApiErrorCodes.TIMEOUT; @@ -180,7 +186,7 @@ public class FileServer { try { var fileReferenceDownload = new FileReferenceDownload(fileReference, client, downloadFromOtherSourceIfNotFound); fileExists = hasFileDownloadIfNeeded(fileReferenceDownload); - if (fileExists) startFileServing(fileReference, receiver); + if (fileExists) startFileServing(fileReference, receiver, acceptedCompressionTypes); } catch (IllegalArgumentException e) { fileExists = false; log.warning("Failed serving file reference '" + fileReference + "', request from " + client + " failed with: " + e.getMessage()); @@ -189,6 +195,11 @@ public class FileServer { return (fileExists ? FileApiErrorCodes.OK : FileApiErrorCodes.NOT_FOUND); } + // TODO: Use lz4 for testing only, add zstd when we have support for (de)compressing zstd input and output streams + private CompressionType chooseCompressionType(Set<CompressionType> acceptedCompressionTypes) { + return acceptedCompressionTypes.contains(lz4) ? lz4 : gzip; + } + boolean hasFileDownloadIfNeeded(FileReferenceDownload fileReferenceDownload) { FileReference fileReference = fileReferenceDownload.fileReference(); if (hasFile(fileReference)) return true; diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java index 6e919ae7f2e..687cb1d3cca 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java @@ -1,9 +1,9 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.config.server.rpc; -import com.yahoo.component.annotation.Inject; import com.yahoo.cloud.config.ConfigserverConfig; import com.yahoo.component.Version; +import com.yahoo.component.annotation.Inject; import com.yahoo.concurrent.ThreadFactoryFactory; import com.yahoo.config.FileReference; import com.yahoo.config.provision.ApplicationId; @@ -44,13 +44,14 @@ import com.yahoo.vespa.filedistribution.FileDownloader; import com.yahoo.vespa.filedistribution.FileReceiver; import com.yahoo.vespa.filedistribution.FileReferenceData; import com.yahoo.vespa.filedistribution.FileReferenceDownload; - import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletionService; import java.util.concurrent.ConcurrentHashMap; @@ -62,8 +63,11 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; +import java.util.stream.Collectors; import java.util.stream.Stream; +import static com.yahoo.vespa.filedistribution.FileReferenceData.CompressionType; + /** * An RPC server class that handles the config protocol RPC method "getConfigV3". * Mandatory hooks need to be implemented by subclasses. @@ -221,7 +225,7 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { getSupervisor().addMethod(new Method("printStatistics", "", "s", this::printStatistics) .methodDesc("printStatistics") .returnDesc(0, "statistics", "Statistics for server")); - getSupervisor().addMethod(new Method("filedistribution.serveFile", "si", "is", this::serveFile)); + getSupervisor().addMethod(new Method("filedistribution.serveFile", "si*", "is", this::serveFile)); getSupervisor().addMethod(new Method("filedistribution.setFileReferencesToDownload", "S", "i", this::setFileReferencesToDownload) .methodDesc("set which file references to download") .paramDesc(0, "file references", "file reference to download") @@ -566,10 +570,18 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { rpcAuthorizer.authorizeFileRequest(request) .thenRun(() -> { // okay to do in authorizer thread as serveFile is async FileServer.Receiver receiver = new ChunkedFileReceiver(request.target()); - fileServer.serveFile(new FileReference(request.parameters().get(0).asString()), - request.parameters().get(1).asInt32() == 0, - request, - receiver); + + FileReference reference = new FileReference(request.parameters().get(0).asString()); + boolean downloadFromOtherSourceIfNotFound = request.parameters().get(1).asInt32() == 0; + Set<FileReferenceData.CompressionType> acceptedCompressionTypes = Set.of(CompressionType.gzip); + // Newer clients specify accepted compression types in request + if (request.parameters().size() > 2) + acceptedCompressionTypes = Arrays.stream(request.parameters().get(2).asStringArray()) + .map(CompressionType::valueOf) + .collect(Collectors.toSet()); + log.log(Level.FINE, "acceptedCompressionTypes=" + acceptedCompressionTypes); + + fileServer.serveFile(reference, downloadFromOtherSourceIfNotFound, acceptedCompressionTypes, request, receiver); }); } |