summaryrefslogtreecommitdiffstats
path: root/filedistribution
diff options
context:
space:
mode:
Diffstat (limited to 'filedistribution')
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java27
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceCompressor.java26
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceData.java1
-rw-r--r--filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java5
-rw-r--r--filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileReceiverTest.java23
5 files changed, 63 insertions, 19 deletions
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java
index 81a8944149c..65c6dd5931d 100644
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java
@@ -23,6 +23,7 @@ import java.util.logging.Level;
import java.util.logging.Logger;
import static com.yahoo.vespa.filedistribution.FileReferenceData.Type;
+import static com.yahoo.vespa.filedistribution.FileReferenceData.CompressionType;
/**
* When asking for a file reference, this handles RPC callbacks from config server with file data and metadata.
@@ -48,7 +49,8 @@ public class FileReceiver {
private final StreamingXXHash64 hasher;
private final int sessionId;
private final FileReference reference;
- private final FileReferenceData.Type fileType;
+ private final Type fileType;
+ private final CompressionType compressionType;
private final String fileName;
private final long fileSize;
private long currentFileSize;
@@ -58,13 +60,18 @@ public class FileReceiver {
private final File tmpDir;
private final File inprogressFile;
- Session(File downloadDirectory, int sessionId, FileReference reference,
- FileReferenceData.Type fileType, String fileName, long fileSize)
- {
+ Session(File downloadDirectory,
+ int sessionId,
+ FileReference reference,
+ Type fileType,
+ FileReferenceData.CompressionType compressionType,
+ String fileName,
+ long fileSize) {
this.hasher = XXHashFactory.fastestInstance().newStreamingHash64(0);
this.sessionId = sessionId;
this.reference = reference;
this.fileType = fileType;
+ this.compressionType = compressionType;
this.fileName = fileName;
this.fileSize = fileSize;
currentFileSize = 0;
@@ -122,7 +129,7 @@ public class FileReceiver {
moveFileToDestination(inprogressFile, file);
} else {
decompressedDir = Files.createTempDirectory(tmpDir.toPath(), "archive").toFile();
- new FileReferenceCompressor(fileType).decompress(inprogressFile, decompressedDir);
+ new FileReferenceCompressor(fileType, compressionType).decompress(inprogressFile, decompressedDir);
moveFileToDestination(decompressedDir, fileReferenceDir);
}
} catch (IOException e) {
@@ -161,11 +168,12 @@ public class FileReceiver {
// receiveFile after getting a serveFile method call). handler needs to implement receiveFile* methods
private List<Method> receiveFileMethod() {
List<Method> methods = new ArrayList<>();
- methods.add(new Method(RECEIVE_META_METHOD, "sssl", "ii", this::receiveFileMeta)
+ methods.add(new Method(RECEIVE_META_METHOD, "sssl*", "ii", this::receiveFileMeta)
.paramDesc(0, "filereference", "file reference to download")
.paramDesc(1, "filename", "filename")
.paramDesc(2, "type", "'file' or 'compressed'")
.paramDesc(3, "filelength", "length in bytes of file")
+ .paramDesc(3, "compressionType", "compression type: gzip, lz4, zstd")
.returnDesc(0, "ret", "0 if success, 1 otherwise")
.returnDesc(1, "session-id", "Session id to be used for this transfer"));
methods.add(new Method(RECEIVE_PART_METHOD, "siix", "i", this::receiveFilePart)
@@ -220,8 +228,11 @@ public class FileReceiver {
log.log(Level.FINE, () -> "Received method call '" + req.methodName() + "' with parameters : " + req.parameters());
FileReference reference = new FileReference(req.parameters().get(0).asString());
String fileName = req.parameters().get(1).asString();
- String type = req.parameters().get(2).asString();
+ Type type = FileReferenceData.Type.valueOf(req.parameters().get(2).asString());
long fileSize = req.parameters().get(3).asInt64();
+ CompressionType compressionType = (req.parameters().size() > 4)
+ ? CompressionType.valueOf(req.parameters().get(4).asString())
+ : CompressionType.gzip; // fallback/legacy compression type
int sessionId = nextSessionId.getAndIncrement();
int retval = 0;
synchronized (sessions) {
@@ -231,7 +242,7 @@ public class FileReceiver {
} else {
try {
sessions.put(sessionId, new Session(downloadDirectory, sessionId, reference,
- FileReferenceData.Type.valueOf(type),fileName, fileSize));
+ type, compressionType, fileName, fileSize));
} catch (Exception e) {
retval = 1;
}
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceCompressor.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceCompressor.java
index c36bcd22606..b485e6ded86 100644
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceCompressor.java
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceCompressor.java
@@ -2,6 +2,8 @@
package com.yahoo.vespa.filedistribution;
import com.google.common.io.ByteStreams;
+import net.jpountz.lz4.LZ4BlockInputStream;
+import net.jpountz.lz4.LZ4BlockOutputStream;
import org.apache.commons.compress.archivers.ArchiveEntry;
import org.apache.commons.compress.archivers.ArchiveInputStream;
import org.apache.commons.compress.archivers.ArchiveOutputStream;
@@ -35,9 +37,11 @@ public class FileReferenceCompressor {
private static final int recurseDepth = 100;
private final FileReferenceData.Type type;
+ private final FileReferenceData.CompressionType compressionType;
- public FileReferenceCompressor(FileReferenceData.Type type) {
+ public FileReferenceCompressor(FileReferenceData.Type type, FileReferenceData.CompressionType compressionType) {
this.type = Objects.requireNonNull(type, "Type cannot be null");
+ this.compressionType = Objects.requireNonNull(compressionType, "Compression type cannot be null");
}
public File compress(File baseDir, List<File> inputFiles, File outputFile) throws IOException {
@@ -114,9 +118,17 @@ public class FileReferenceCompressor {
}
private OutputStream compressedOutputStream(File outputFile) throws IOException {
+ log.log(Level.FINE, () -> "Compressing with type " + type + " and compression type " + compressionType);
switch (type) {
case compressed:
- return new GZIPOutputStream(new FileOutputStream(outputFile));
+ switch (compressionType) {
+ case gzip:
+ return new GZIPOutputStream(new FileOutputStream(outputFile));
+ case lz4:
+ return new LZ4BlockOutputStream(new FileOutputStream(outputFile));
+ default:
+ throw new RuntimeException("Unknown compression type " + compressionType);
+ }
case file:
return new FileOutputStream(outputFile);
default:
@@ -125,9 +137,17 @@ public class FileReferenceCompressor {
}
private InputStream decompressedInputStream(File inputFile) throws IOException {
+ log.log(Level.FINE, () -> "Decompressing with type " + type + " and compression type " + compressionType);
switch (type) {
case compressed:
- return new GZIPInputStream(new FileInputStream(inputFile));
+ switch (compressionType) {
+ case gzip:
+ return new GZIPInputStream(new FileInputStream(inputFile));
+ case lz4:
+ return new LZ4BlockInputStream(new FileInputStream(inputFile));
+ default:
+ throw new RuntimeException("Unknown compression type " + compressionType);
+ }
case file:
return new FileInputStream(inputFile);
default:
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceData.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceData.java
index 03f8d184f94..d14f690b2d3 100644
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceData.java
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceData.java
@@ -13,6 +13,7 @@ import java.nio.ByteBuffer;
public abstract class FileReferenceData {
public enum Type { file, compressed }
+ public enum CompressionType { gzip, lz4, zstd }
private final FileReference fileReference;
private final String filename;
diff --git a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java
index 637fbbbd0a7..f5cd1760e89 100644
--- a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java
+++ b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java
@@ -32,6 +32,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import static com.yahoo.jrt.ErrorCode.CONNECTION;
+import static com.yahoo.vespa.filedistribution.FileReferenceData.CompressionType.gzip;
import static com.yahoo.vespa.filedistribution.FileReferenceData.Type.compressed;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -148,7 +149,7 @@ public class FileDownloaderTest {
File barFile = new File(subdir, "really-long-filename-over-100-bytes-aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa");
IOUtils.writeFile(barFile, "bar", false);
- File tarFile = new FileReferenceCompressor(compressed).compress(tempPath.toFile(), Arrays.asList(fooFile, barFile), new File(tempPath.toFile(), filename));
+ File tarFile = new FileReferenceCompressor(compressed, gzip).compress(tempPath.toFile(), Arrays.asList(fooFile, barFile), new File(tempPath.toFile(), filename));
byte[] tarredContent = IOUtils.readFileBytes(tarFile);
receiveFile(fileReference, filename, compressed, tarredContent);
Optional<File> downloadedFile = getFile(fileReference);
@@ -291,7 +292,7 @@ public class FileDownloaderTest {
FileReferenceData.Type type, byte[] content) {
XXHash64 hasher = XXHashFactory.fastestInstance().hash64();
FileReceiver.Session session =
- new FileReceiver.Session(downloadDir, 1, fileReference, type, filename, content.length);
+ new FileReceiver.Session(downloadDir, 1, fileReference, type, gzip, filename, content.length);
session.addPart(0, content);
File file = session.close(hasher.hash(ByteBuffer.wrap(content), 0));
fileDownloader.downloads().completedDownloading(fileReference, file);
diff --git a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileReceiverTest.java b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileReceiverTest.java
index 5c15f945ae3..84e7a07340e 100644
--- a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileReceiverTest.java
+++ b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileReceiverTest.java
@@ -16,7 +16,11 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
+import static com.yahoo.vespa.filedistribution.FileReferenceData.CompressionType;
+import static com.yahoo.vespa.filedistribution.FileReferenceData.CompressionType.gzip;
+import static com.yahoo.vespa.filedistribution.FileReferenceData.CompressionType.lz4;
import static com.yahoo.vespa.filedistribution.FileReferenceData.Type.compressed;
+import static com.yahoo.vespa.filedistribution.FileReferenceData.Type.file;
import static org.junit.Assert.assertEquals;
public class FileReceiverTest {
@@ -58,18 +62,25 @@ public class FileReceiverTest {
writerB.close();
File tempFile = temporaryFolder.newFile();
- File file = new FileReferenceCompressor(compressed).compress(dirWithFiles, tempFile);
- transferCompressedData(new FileReference("ref"), "a", IOUtils.readFileBytes(file));
+ File file = new FileReferenceCompressor(compressed, gzip).compress(dirWithFiles, tempFile);
+ transferCompressedData(gzip, new FileReference("ref"), "a", IOUtils.readFileBytes(file));
File downloadDir = new File(root, "ref");
assertEquals("1", IOUtils.readFile(new File(downloadDir, "a")));
assertEquals("2", IOUtils.readFile(new File(downloadDir, "b")));
+
+ tempFile = temporaryFolder.newFile();
+ FileReferenceCompressor compressor = new FileReferenceCompressor(compressed, lz4);
+ file = compressor.compress(dirWithFiles, tempFile);
+ transferCompressedData(lz4, new FileReference("ref"), "a", IOUtils.readFileBytes(file));
+ downloadDir = new File(root, "ref");
+ assertEquals("1", IOUtils.readFile(new File(downloadDir, "a")));
+ assertEquals("2", IOUtils.readFile(new File(downloadDir, "b")));
}
private void transferPartsAndAssert(FileReference ref, String fileName, String all, int numParts) throws IOException {
byte [] allContent = Utf8.toBytes(all);
- FileReceiver.Session session = new FileReceiver.Session(root, 1, ref,
- FileReferenceData.Type.file, fileName, allContent.length);
+ FileReceiver.Session session = new FileReceiver.Session(root, 1, ref, file, gzip, fileName, allContent.length);
int partSize = (allContent.length+(numParts-1))/numParts;
ByteBuffer bb = ByteBuffer.wrap(allContent);
for (int i = 0, pos = 0; i < numParts; i++) {
@@ -87,8 +98,8 @@ public class FileReceiverTest {
assertEquals(all, Utf8.toString(allReadBytes));
}
- private void transferCompressedData(FileReference ref, String fileName, byte[] data) {
- FileReceiver.Session session = new FileReceiver.Session(root, 1, ref, compressed, fileName, data.length);
+ private void transferCompressedData(CompressionType compressionType, FileReference ref, String fileName, byte[] data) {
+ FileReceiver.Session session = new FileReceiver.Session(root, 1, ref, compressed, compressionType, fileName, data.length);
session.addPart(0, data);
session.close(hasher.hash(ByteBuffer.wrap(data), 0));
}