aboutsummaryrefslogtreecommitdiffstats
path: root/filedistribution
diff options
context:
space:
mode:
authorHarald Musum <musum@yahooinc.com>2022-07-05 09:58:47 +0200
committerHarald Musum <musum@yahooinc.com>2022-07-05 09:58:47 +0200
commit639dc28266e8b2cf08c7f93f19ccc9275888896c (patch)
treea860fa9f5b4cc26e17ab10ebc0fe30443db40e1c /filedistribution
parentf2802027f402d5b16f5fffb9cfc5a3d7f7401ab4 (diff)
Support specifying compression type for file distribution requests
Add accepted compression types in file distribution requests. Older clients (e.g. Vespa 7.x) will not have that in requests, so fallback to gzip in that case.
Diffstat (limited to 'filedistribution')
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java27
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceCompressor.java26
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceData.java1
-rw-r--r--filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java5
-rw-r--r--filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileReceiverTest.java23
5 files changed, 63 insertions, 19 deletions
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java
index 81a8944149c..65c6dd5931d 100644
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java
@@ -23,6 +23,7 @@ import java.util.logging.Level;
import java.util.logging.Logger;
import static com.yahoo.vespa.filedistribution.FileReferenceData.Type;
+import static com.yahoo.vespa.filedistribution.FileReferenceData.CompressionType;
/**
* When asking for a file reference, this handles RPC callbacks from config server with file data and metadata.
@@ -48,7 +49,8 @@ public class FileReceiver {
private final StreamingXXHash64 hasher;
private final int sessionId;
private final FileReference reference;
- private final FileReferenceData.Type fileType;
+ private final Type fileType;
+ private final CompressionType compressionType;
private final String fileName;
private final long fileSize;
private long currentFileSize;
@@ -58,13 +60,18 @@ public class FileReceiver {
private final File tmpDir;
private final File inprogressFile;
- Session(File downloadDirectory, int sessionId, FileReference reference,
- FileReferenceData.Type fileType, String fileName, long fileSize)
- {
+ Session(File downloadDirectory,
+ int sessionId,
+ FileReference reference,
+ Type fileType,
+ FileReferenceData.CompressionType compressionType,
+ String fileName,
+ long fileSize) {
this.hasher = XXHashFactory.fastestInstance().newStreamingHash64(0);
this.sessionId = sessionId;
this.reference = reference;
this.fileType = fileType;
+ this.compressionType = compressionType;
this.fileName = fileName;
this.fileSize = fileSize;
currentFileSize = 0;
@@ -122,7 +129,7 @@ public class FileReceiver {
moveFileToDestination(inprogressFile, file);
} else {
decompressedDir = Files.createTempDirectory(tmpDir.toPath(), "archive").toFile();
- new FileReferenceCompressor(fileType).decompress(inprogressFile, decompressedDir);
+ new FileReferenceCompressor(fileType, compressionType).decompress(inprogressFile, decompressedDir);
moveFileToDestination(decompressedDir, fileReferenceDir);
}
} catch (IOException e) {
@@ -161,11 +168,12 @@ public class FileReceiver {
// receiveFile after getting a serveFile method call). handler needs to implement receiveFile* methods
private List<Method> receiveFileMethod() {
List<Method> methods = new ArrayList<>();
- methods.add(new Method(RECEIVE_META_METHOD, "sssl", "ii", this::receiveFileMeta)
+ methods.add(new Method(RECEIVE_META_METHOD, "sssl*", "ii", this::receiveFileMeta)
.paramDesc(0, "filereference", "file reference to download")
.paramDesc(1, "filename", "filename")
.paramDesc(2, "type", "'file' or 'compressed'")
.paramDesc(3, "filelength", "length in bytes of file")
+ .paramDesc(3, "compressionType", "compression type: gzip, lz4, zstd")
.returnDesc(0, "ret", "0 if success, 1 otherwise")
.returnDesc(1, "session-id", "Session id to be used for this transfer"));
methods.add(new Method(RECEIVE_PART_METHOD, "siix", "i", this::receiveFilePart)
@@ -220,8 +228,11 @@ public class FileReceiver {
log.log(Level.FINE, () -> "Received method call '" + req.methodName() + "' with parameters : " + req.parameters());
FileReference reference = new FileReference(req.parameters().get(0).asString());
String fileName = req.parameters().get(1).asString();
- String type = req.parameters().get(2).asString();
+ Type type = FileReferenceData.Type.valueOf(req.parameters().get(2).asString());
long fileSize = req.parameters().get(3).asInt64();
+ CompressionType compressionType = (req.parameters().size() > 4)
+ ? CompressionType.valueOf(req.parameters().get(4).asString())
+ : CompressionType.gzip; // fallback/legacy compression type
int sessionId = nextSessionId.getAndIncrement();
int retval = 0;
synchronized (sessions) {
@@ -231,7 +242,7 @@ public class FileReceiver {
} else {
try {
sessions.put(sessionId, new Session(downloadDirectory, sessionId, reference,
- FileReferenceData.Type.valueOf(type),fileName, fileSize));
+ type, compressionType, fileName, fileSize));
} catch (Exception e) {
retval = 1;
}
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceCompressor.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceCompressor.java
index c36bcd22606..b485e6ded86 100644
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceCompressor.java
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceCompressor.java
@@ -2,6 +2,8 @@
package com.yahoo.vespa.filedistribution;
import com.google.common.io.ByteStreams;
+import net.jpountz.lz4.LZ4BlockInputStream;
+import net.jpountz.lz4.LZ4BlockOutputStream;
import org.apache.commons.compress.archivers.ArchiveEntry;
import org.apache.commons.compress.archivers.ArchiveInputStream;
import org.apache.commons.compress.archivers.ArchiveOutputStream;
@@ -35,9 +37,11 @@ public class FileReferenceCompressor {
private static final int recurseDepth = 100;
private final FileReferenceData.Type type;
+ private final FileReferenceData.CompressionType compressionType;
- public FileReferenceCompressor(FileReferenceData.Type type) {
+ public FileReferenceCompressor(FileReferenceData.Type type, FileReferenceData.CompressionType compressionType) {
this.type = Objects.requireNonNull(type, "Type cannot be null");
+ this.compressionType = Objects.requireNonNull(compressionType, "Compression type cannot be null");
}
public File compress(File baseDir, List<File> inputFiles, File outputFile) throws IOException {
@@ -114,9 +118,17 @@ public class FileReferenceCompressor {
}
private OutputStream compressedOutputStream(File outputFile) throws IOException {
+ log.log(Level.FINE, () -> "Compressing with type " + type + " and compression type " + compressionType);
switch (type) {
case compressed:
- return new GZIPOutputStream(new FileOutputStream(outputFile));
+ switch (compressionType) {
+ case gzip:
+ return new GZIPOutputStream(new FileOutputStream(outputFile));
+ case lz4:
+ return new LZ4BlockOutputStream(new FileOutputStream(outputFile));
+ default:
+ throw new RuntimeException("Unknown compression type " + compressionType);
+ }
case file:
return new FileOutputStream(outputFile);
default:
@@ -125,9 +137,17 @@ public class FileReferenceCompressor {
}
private InputStream decompressedInputStream(File inputFile) throws IOException {
+ log.log(Level.FINE, () -> "Decompressing with type " + type + " and compression type " + compressionType);
switch (type) {
case compressed:
- return new GZIPInputStream(new FileInputStream(inputFile));
+ switch (compressionType) {
+ case gzip:
+ return new GZIPInputStream(new FileInputStream(inputFile));
+ case lz4:
+ return new LZ4BlockInputStream(new FileInputStream(inputFile));
+ default:
+ throw new RuntimeException("Unknown compression type " + compressionType);
+ }
case file:
return new FileInputStream(inputFile);
default:
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceData.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceData.java
index 03f8d184f94..d14f690b2d3 100644
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceData.java
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceData.java
@@ -13,6 +13,7 @@ import java.nio.ByteBuffer;
public abstract class FileReferenceData {
public enum Type { file, compressed }
+ public enum CompressionType { gzip, lz4, zstd }
private final FileReference fileReference;
private final String filename;
diff --git a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java
index 637fbbbd0a7..f5cd1760e89 100644
--- a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java
+++ b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java
@@ -32,6 +32,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import static com.yahoo.jrt.ErrorCode.CONNECTION;
+import static com.yahoo.vespa.filedistribution.FileReferenceData.CompressionType.gzip;
import static com.yahoo.vespa.filedistribution.FileReferenceData.Type.compressed;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -148,7 +149,7 @@ public class FileDownloaderTest {
File barFile = new File(subdir, "really-long-filename-over-100-bytes-aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa");
IOUtils.writeFile(barFile, "bar", false);
- File tarFile = new FileReferenceCompressor(compressed).compress(tempPath.toFile(), Arrays.asList(fooFile, barFile), new File(tempPath.toFile(), filename));
+ File tarFile = new FileReferenceCompressor(compressed, gzip).compress(tempPath.toFile(), Arrays.asList(fooFile, barFile), new File(tempPath.toFile(), filename));
byte[] tarredContent = IOUtils.readFileBytes(tarFile);
receiveFile(fileReference, filename, compressed, tarredContent);
Optional<File> downloadedFile = getFile(fileReference);
@@ -291,7 +292,7 @@ public class FileDownloaderTest {
FileReferenceData.Type type, byte[] content) {
XXHash64 hasher = XXHashFactory.fastestInstance().hash64();
FileReceiver.Session session =
- new FileReceiver.Session(downloadDir, 1, fileReference, type, filename, content.length);
+ new FileReceiver.Session(downloadDir, 1, fileReference, type, gzip, filename, content.length);
session.addPart(0, content);
File file = session.close(hasher.hash(ByteBuffer.wrap(content), 0));
fileDownloader.downloads().completedDownloading(fileReference, file);
diff --git a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileReceiverTest.java b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileReceiverTest.java
index 5c15f945ae3..84e7a07340e 100644
--- a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileReceiverTest.java
+++ b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileReceiverTest.java
@@ -16,7 +16,11 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
+import static com.yahoo.vespa.filedistribution.FileReferenceData.CompressionType;
+import static com.yahoo.vespa.filedistribution.FileReferenceData.CompressionType.gzip;
+import static com.yahoo.vespa.filedistribution.FileReferenceData.CompressionType.lz4;
import static com.yahoo.vespa.filedistribution.FileReferenceData.Type.compressed;
+import static com.yahoo.vespa.filedistribution.FileReferenceData.Type.file;
import static org.junit.Assert.assertEquals;
public class FileReceiverTest {
@@ -58,18 +62,25 @@ public class FileReceiverTest {
writerB.close();
File tempFile = temporaryFolder.newFile();
- File file = new FileReferenceCompressor(compressed).compress(dirWithFiles, tempFile);
- transferCompressedData(new FileReference("ref"), "a", IOUtils.readFileBytes(file));
+ File file = new FileReferenceCompressor(compressed, gzip).compress(dirWithFiles, tempFile);
+ transferCompressedData(gzip, new FileReference("ref"), "a", IOUtils.readFileBytes(file));
File downloadDir = new File(root, "ref");
assertEquals("1", IOUtils.readFile(new File(downloadDir, "a")));
assertEquals("2", IOUtils.readFile(new File(downloadDir, "b")));
+
+ tempFile = temporaryFolder.newFile();
+ FileReferenceCompressor compressor = new FileReferenceCompressor(compressed, lz4);
+ file = compressor.compress(dirWithFiles, tempFile);
+ transferCompressedData(lz4, new FileReference("ref"), "a", IOUtils.readFileBytes(file));
+ downloadDir = new File(root, "ref");
+ assertEquals("1", IOUtils.readFile(new File(downloadDir, "a")));
+ assertEquals("2", IOUtils.readFile(new File(downloadDir, "b")));
}
private void transferPartsAndAssert(FileReference ref, String fileName, String all, int numParts) throws IOException {
byte [] allContent = Utf8.toBytes(all);
- FileReceiver.Session session = new FileReceiver.Session(root, 1, ref,
- FileReferenceData.Type.file, fileName, allContent.length);
+ FileReceiver.Session session = new FileReceiver.Session(root, 1, ref, file, gzip, fileName, allContent.length);
int partSize = (allContent.length+(numParts-1))/numParts;
ByteBuffer bb = ByteBuffer.wrap(allContent);
for (int i = 0, pos = 0; i < numParts; i++) {
@@ -87,8 +98,8 @@ public class FileReceiverTest {
assertEquals(all, Utf8.toString(allReadBytes));
}
- private void transferCompressedData(FileReference ref, String fileName, byte[] data) {
- FileReceiver.Session session = new FileReceiver.Session(root, 1, ref, compressed, fileName, data.length);
+ private void transferCompressedData(CompressionType compressionType, FileReference ref, String fileName, byte[] data) {
+ FileReceiver.Session session = new FileReceiver.Session(root, 1, ref, compressed, compressionType, fileName, data.length);
session.addPart(0, data);
session.close(hasher.hash(ByteBuffer.wrap(data), 0));
}