aboutsummaryrefslogtreecommitdiffstats
path: root/configserver/src/main/java/com/yahoo/vespa/config/server
diff options
context:
space:
mode:
authorHarald Musum <musum@yahooinc.com>2022-07-05 09:58:47 +0200
committerHarald Musum <musum@yahooinc.com>2022-07-05 09:58:47 +0200
commit639dc28266e8b2cf08c7f93f19ccc9275888896c (patch)
treea860fa9f5b4cc26e17ab10ebc0fe30443db40e1c /configserver/src/main/java/com/yahoo/vespa/config/server
parentf2802027f402d5b16f5fffb9cfc5a3d7f7401ab4 (diff)
Support specifying compression type for file distribution requests
Add accepted compression types in file distribution requests. Older clients (e.g. Vespa 7.x) will not have that in requests, so fallback to gzip in that case.
Diffstat (limited to 'configserver/src/main/java/com/yahoo/vespa/config/server')
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java35
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java26
2 files changed, 42 insertions, 19 deletions
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java
index 3fe727c6f9d..f0c34e83713 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java
@@ -29,6 +29,7 @@ import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
@@ -36,6 +37,9 @@ import java.util.logging.Level;
import java.util.logging.Logger;
import static com.yahoo.vespa.config.server.filedistribution.FileDistributionUtil.getOtherConfigServersInCluster;
+import static com.yahoo.vespa.filedistribution.FileReferenceData.CompressionType;
+import static com.yahoo.vespa.filedistribution.FileReferenceData.CompressionType.gzip;
+import static com.yahoo.vespa.filedistribution.FileReferenceData.CompressionType.lz4;
import static com.yahoo.vespa.filedistribution.FileReferenceData.Type.compressed;
public class FileServer {
@@ -114,17 +118,14 @@ public class FileServer {
FileDirectory getRootDir() { return root; }
- void startFileServing(FileReference fileReference, Receiver target) {
- if (root.getFile(fileReference).exists())
- serveFile(fileReference, target);
- }
+ void startFileServing(FileReference reference, Receiver target, Set<CompressionType> acceptedCompressionTypes) {
+ if ( ! root.getFile(reference).exists()) return;
- private void serveFile(FileReference reference, Receiver target) {
File file = root.getFile(reference);
log.log(Level.FINE, () -> "Start serving " + reference + " with file '" + file.getAbsolutePath() + "'");
FileReferenceData fileData = EmptyFileReferenceData.empty(reference, file.getName());
try {
- fileData = readFileReferenceData(reference);
+ fileData = readFileReferenceData(reference, acceptedCompressionTypes);
target.receive(fileData, new ReplayStatus(0, "OK"));
log.log(Level.FINE, () -> "Done serving " + reference.value() + " with file '" + file.getAbsolutePath() + "'");
} catch (IOException e) {
@@ -138,19 +139,23 @@ public class FileServer {
}
}
- private FileReferenceData readFileReferenceData(FileReference reference) throws IOException {
+ private FileReferenceData readFileReferenceData(FileReference reference, Set<CompressionType> acceptedCompressionTypes) throws IOException {
File file = root.getFile(reference);
if (file.isDirectory()) {
Path tempFile = Files.createTempFile("filereferencedata", reference.value());
- File compressedFile = new FileReferenceCompressor(compressed).compress(file.getParentFile(), tempFile.toFile());
+ CompressionType compressionType = chooseCompressionType(acceptedCompressionTypes);
+ File compressedFile = new FileReferenceCompressor(compressed, compressionType).compress(file.getParentFile(), tempFile.toFile());
return new LazyTemporaryStorageFileReferenceData(reference, file.getName(), compressed, compressedFile);
} else {
return new LazyFileReferenceData(reference, file.getName(), FileReferenceData.Type.file, file);
}
}
- public void serveFile(FileReference fileReference, boolean downloadFromOtherSourceIfNotFound, Request request, Receiver receiver) {
+ public void serveFile(FileReference fileReference,
+ boolean downloadFromOtherSourceIfNotFound,
+ Set<CompressionType> acceptedCompressionTypes,
+ Request request, Receiver receiver) {
if (executor instanceof ThreadPoolExecutor)
log.log(Level.FINE, () -> "Active threads: " + ((ThreadPoolExecutor) executor).getActiveCount());
@@ -158,7 +163,7 @@ public class FileServer {
Instant deadline = Instant.now().plus(timeout);
String client = request.target().toString();
executor.execute(() -> {
- var result = serveFileInternal(fileReference, downloadFromOtherSourceIfNotFound, client, receiver, deadline);
+ var result = serveFileInternal(fileReference, downloadFromOtherSourceIfNotFound, client, receiver, deadline, acceptedCompressionTypes);
request.returnValues()
.add(new Int32Value(result.getCode()))
.add(new StringValue(result.getDescription()));
@@ -170,7 +175,8 @@ public class FileServer {
boolean downloadFromOtherSourceIfNotFound,
String client,
Receiver receiver,
- Instant deadline) {
+ Instant deadline,
+ Set<CompressionType> acceptedCompressionTypes) {
if (Instant.now().isAfter(deadline)) {
log.log(Level.INFO, () -> "Deadline exceeded for request for file reference '" + fileReference + "' from " + client);
return FileApiErrorCodes.TIMEOUT;
@@ -180,7 +186,7 @@ public class FileServer {
try {
var fileReferenceDownload = new FileReferenceDownload(fileReference, client, downloadFromOtherSourceIfNotFound);
fileExists = hasFileDownloadIfNeeded(fileReferenceDownload);
- if (fileExists) startFileServing(fileReference, receiver);
+ if (fileExists) startFileServing(fileReference, receiver, acceptedCompressionTypes);
} catch (IllegalArgumentException e) {
fileExists = false;
log.warning("Failed serving file reference '" + fileReference + "', request from " + client + " failed with: " + e.getMessage());
@@ -189,6 +195,11 @@ public class FileServer {
return (fileExists ? FileApiErrorCodes.OK : FileApiErrorCodes.NOT_FOUND);
}
+ // TODO: Use lz4 for testing only, add zstd when we have support for (de)compressing zstd input and output streams
+ private CompressionType chooseCompressionType(Set<CompressionType> acceptedCompressionTypes) {
+ return acceptedCompressionTypes.contains(lz4) ? lz4 : gzip;
+ }
+
boolean hasFileDownloadIfNeeded(FileReferenceDownload fileReferenceDownload) {
FileReference fileReference = fileReferenceDownload.fileReference();
if (hasFile(fileReference)) return true;
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java
index 6e919ae7f2e..687cb1d3cca 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java
@@ -1,9 +1,9 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.config.server.rpc;
-import com.yahoo.component.annotation.Inject;
import com.yahoo.cloud.config.ConfigserverConfig;
import com.yahoo.component.Version;
+import com.yahoo.component.annotation.Inject;
import com.yahoo.concurrent.ThreadFactoryFactory;
import com.yahoo.config.FileReference;
import com.yahoo.config.provision.ApplicationId;
@@ -44,13 +44,14 @@ import com.yahoo.vespa.filedistribution.FileDownloader;
import com.yahoo.vespa.filedistribution.FileReceiver;
import com.yahoo.vespa.filedistribution.FileReferenceData;
import com.yahoo.vespa.filedistribution.FileReferenceDownload;
-
import java.nio.ByteBuffer;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
@@ -62,8 +63,11 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static com.yahoo.vespa.filedistribution.FileReferenceData.CompressionType;
+
/**
* An RPC server class that handles the config protocol RPC method "getConfigV3".
* Mandatory hooks need to be implemented by subclasses.
@@ -221,7 +225,7 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener {
getSupervisor().addMethod(new Method("printStatistics", "", "s", this::printStatistics)
.methodDesc("printStatistics")
.returnDesc(0, "statistics", "Statistics for server"));
- getSupervisor().addMethod(new Method("filedistribution.serveFile", "si", "is", this::serveFile));
+ getSupervisor().addMethod(new Method("filedistribution.serveFile", "si*", "is", this::serveFile));
getSupervisor().addMethod(new Method("filedistribution.setFileReferencesToDownload", "S", "i", this::setFileReferencesToDownload)
.methodDesc("set which file references to download")
.paramDesc(0, "file references", "file reference to download")
@@ -566,10 +570,18 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener {
rpcAuthorizer.authorizeFileRequest(request)
.thenRun(() -> { // okay to do in authorizer thread as serveFile is async
FileServer.Receiver receiver = new ChunkedFileReceiver(request.target());
- fileServer.serveFile(new FileReference(request.parameters().get(0).asString()),
- request.parameters().get(1).asInt32() == 0,
- request,
- receiver);
+
+ FileReference reference = new FileReference(request.parameters().get(0).asString());
+ boolean downloadFromOtherSourceIfNotFound = request.parameters().get(1).asInt32() == 0;
+ Set<FileReferenceData.CompressionType> acceptedCompressionTypes = Set.of(CompressionType.gzip);
+ // Newer clients specify accepted compression types in request
+ if (request.parameters().size() > 2)
+ acceptedCompressionTypes = Arrays.stream(request.parameters().get(2).asStringArray())
+ .map(CompressionType::valueOf)
+ .collect(Collectors.toSet());
+ log.log(Level.FINE, "acceptedCompressionTypes=" + acceptedCompressionTypes);
+
+ fileServer.serveFile(reference, downloadFromOtherSourceIfNotFound, acceptedCompressionTypes, request, receiver);
});
}