diff options
Diffstat (limited to 'filedistribution')
5 files changed, 63 insertions, 19 deletions
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java index 81a8944149c..65c6dd5931d 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java @@ -23,6 +23,7 @@ import java.util.logging.Level; import java.util.logging.Logger; import static com.yahoo.vespa.filedistribution.FileReferenceData.Type; +import static com.yahoo.vespa.filedistribution.FileReferenceData.CompressionType; /** * When asking for a file reference, this handles RPC callbacks from config server with file data and metadata. @@ -48,7 +49,8 @@ public class FileReceiver { private final StreamingXXHash64 hasher; private final int sessionId; private final FileReference reference; - private final FileReferenceData.Type fileType; + private final Type fileType; + private final CompressionType compressionType; private final String fileName; private final long fileSize; private long currentFileSize; @@ -58,13 +60,18 @@ public class FileReceiver { private final File tmpDir; private final File inprogressFile; - Session(File downloadDirectory, int sessionId, FileReference reference, - FileReferenceData.Type fileType, String fileName, long fileSize) - { + Session(File downloadDirectory, + int sessionId, + FileReference reference, + Type fileType, + FileReferenceData.CompressionType compressionType, + String fileName, + long fileSize) { this.hasher = XXHashFactory.fastestInstance().newStreamingHash64(0); this.sessionId = sessionId; this.reference = reference; this.fileType = fileType; + this.compressionType = compressionType; this.fileName = fileName; this.fileSize = fileSize; currentFileSize = 0; @@ -122,7 +129,7 @@ public class FileReceiver { moveFileToDestination(inprogressFile, file); } else { decompressedDir = Files.createTempDirectory(tmpDir.toPath(), "archive").toFile(); - new FileReferenceCompressor(fileType).decompress(inprogressFile, decompressedDir); + new FileReferenceCompressor(fileType, compressionType).decompress(inprogressFile, decompressedDir); moveFileToDestination(decompressedDir, fileReferenceDir); } } catch (IOException e) { @@ -161,11 +168,12 @@ public class FileReceiver { // receiveFile after getting a serveFile method call). handler needs to implement receiveFile* methods private List<Method> receiveFileMethod() { List<Method> methods = new ArrayList<>(); - methods.add(new Method(RECEIVE_META_METHOD, "sssl", "ii", this::receiveFileMeta) + methods.add(new Method(RECEIVE_META_METHOD, "sssl*", "ii", this::receiveFileMeta) .paramDesc(0, "filereference", "file reference to download") .paramDesc(1, "filename", "filename") .paramDesc(2, "type", "'file' or 'compressed'") .paramDesc(3, "filelength", "length in bytes of file") + .paramDesc(3, "compressionType", "compression type: gzip, lz4, zstd") .returnDesc(0, "ret", "0 if success, 1 otherwise") .returnDesc(1, "session-id", "Session id to be used for this transfer")); methods.add(new Method(RECEIVE_PART_METHOD, "siix", "i", this::receiveFilePart) @@ -220,8 +228,11 @@ public class FileReceiver { log.log(Level.FINE, () -> "Received method call '" + req.methodName() + "' with parameters : " + req.parameters()); FileReference reference = new FileReference(req.parameters().get(0).asString()); String fileName = req.parameters().get(1).asString(); - String type = req.parameters().get(2).asString(); + Type type = FileReferenceData.Type.valueOf(req.parameters().get(2).asString()); long fileSize = req.parameters().get(3).asInt64(); + CompressionType compressionType = (req.parameters().size() > 4) + ? CompressionType.valueOf(req.parameters().get(4).asString()) + : CompressionType.gzip; // fallback/legacy compression type int sessionId = nextSessionId.getAndIncrement(); int retval = 0; synchronized (sessions) { @@ -231,7 +242,7 @@ public class FileReceiver { } else { try { sessions.put(sessionId, new Session(downloadDirectory, sessionId, reference, - FileReferenceData.Type.valueOf(type),fileName, fileSize)); + type, compressionType, fileName, fileSize)); } catch (Exception e) { retval = 1; } diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceCompressor.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceCompressor.java index c36bcd22606..b485e6ded86 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceCompressor.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceCompressor.java @@ -2,6 +2,8 @@ package com.yahoo.vespa.filedistribution; import com.google.common.io.ByteStreams; +import net.jpountz.lz4.LZ4BlockInputStream; +import net.jpountz.lz4.LZ4BlockOutputStream; import org.apache.commons.compress.archivers.ArchiveEntry; import org.apache.commons.compress.archivers.ArchiveInputStream; import org.apache.commons.compress.archivers.ArchiveOutputStream; @@ -35,9 +37,11 @@ public class FileReferenceCompressor { private static final int recurseDepth = 100; private final FileReferenceData.Type type; + private final FileReferenceData.CompressionType compressionType; - public FileReferenceCompressor(FileReferenceData.Type type) { + public FileReferenceCompressor(FileReferenceData.Type type, FileReferenceData.CompressionType compressionType) { this.type = Objects.requireNonNull(type, "Type cannot be null"); + this.compressionType = Objects.requireNonNull(compressionType, "Compression type cannot be null"); } public File compress(File baseDir, List<File> inputFiles, File outputFile) throws IOException { @@ -114,9 +118,17 @@ public class FileReferenceCompressor { } private OutputStream compressedOutputStream(File outputFile) throws IOException { + log.log(Level.FINE, () -> "Compressing with type " + type + " and compression type " + compressionType); switch (type) { case compressed: - return new GZIPOutputStream(new FileOutputStream(outputFile)); + switch (compressionType) { + case gzip: + return new GZIPOutputStream(new FileOutputStream(outputFile)); + case lz4: + return new LZ4BlockOutputStream(new FileOutputStream(outputFile)); + default: + throw new RuntimeException("Unknown compression type " + compressionType); + } case file: return new FileOutputStream(outputFile); default: @@ -125,9 +137,17 @@ public class FileReferenceCompressor { } private InputStream decompressedInputStream(File inputFile) throws IOException { + log.log(Level.FINE, () -> "Decompressing with type " + type + " and compression type " + compressionType); switch (type) { case compressed: - return new GZIPInputStream(new FileInputStream(inputFile)); + switch (compressionType) { + case gzip: + return new GZIPInputStream(new FileInputStream(inputFile)); + case lz4: + return new LZ4BlockInputStream(new FileInputStream(inputFile)); + default: + throw new RuntimeException("Unknown compression type " + compressionType); + } case file: return new FileInputStream(inputFile); default: diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceData.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceData.java index 03f8d184f94..d14f690b2d3 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceData.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceData.java @@ -13,6 +13,7 @@ import java.nio.ByteBuffer; public abstract class FileReferenceData { public enum Type { file, compressed } + public enum CompressionType { gzip, lz4, zstd } private final FileReference fileReference; private final String filename; diff --git a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java index 637fbbbd0a7..f5cd1760e89 100644 --- a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java +++ b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java @@ -32,6 +32,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import static com.yahoo.jrt.ErrorCode.CONNECTION; +import static com.yahoo.vespa.filedistribution.FileReferenceData.CompressionType.gzip; import static com.yahoo.vespa.filedistribution.FileReferenceData.Type.compressed; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -148,7 +149,7 @@ public class FileDownloaderTest { File barFile = new File(subdir, "really-long-filename-over-100-bytes-aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"); IOUtils.writeFile(barFile, "bar", false); - File tarFile = new FileReferenceCompressor(compressed).compress(tempPath.toFile(), Arrays.asList(fooFile, barFile), new File(tempPath.toFile(), filename)); + File tarFile = new FileReferenceCompressor(compressed, gzip).compress(tempPath.toFile(), Arrays.asList(fooFile, barFile), new File(tempPath.toFile(), filename)); byte[] tarredContent = IOUtils.readFileBytes(tarFile); receiveFile(fileReference, filename, compressed, tarredContent); Optional<File> downloadedFile = getFile(fileReference); @@ -291,7 +292,7 @@ public class FileDownloaderTest { FileReferenceData.Type type, byte[] content) { XXHash64 hasher = XXHashFactory.fastestInstance().hash64(); FileReceiver.Session session = - new FileReceiver.Session(downloadDir, 1, fileReference, type, filename, content.length); + new FileReceiver.Session(downloadDir, 1, fileReference, type, gzip, filename, content.length); session.addPart(0, content); File file = session.close(hasher.hash(ByteBuffer.wrap(content), 0)); fileDownloader.downloads().completedDownloading(fileReference, file); diff --git a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileReceiverTest.java b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileReceiverTest.java index 5c15f945ae3..84e7a07340e 100644 --- a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileReceiverTest.java +++ b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileReceiverTest.java @@ -16,7 +16,11 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.file.Files; +import static com.yahoo.vespa.filedistribution.FileReferenceData.CompressionType; +import static com.yahoo.vespa.filedistribution.FileReferenceData.CompressionType.gzip; +import static com.yahoo.vespa.filedistribution.FileReferenceData.CompressionType.lz4; import static com.yahoo.vespa.filedistribution.FileReferenceData.Type.compressed; +import static com.yahoo.vespa.filedistribution.FileReferenceData.Type.file; import static org.junit.Assert.assertEquals; public class FileReceiverTest { @@ -58,18 +62,25 @@ public class FileReceiverTest { writerB.close(); File tempFile = temporaryFolder.newFile(); - File file = new FileReferenceCompressor(compressed).compress(dirWithFiles, tempFile); - transferCompressedData(new FileReference("ref"), "a", IOUtils.readFileBytes(file)); + File file = new FileReferenceCompressor(compressed, gzip).compress(dirWithFiles, tempFile); + transferCompressedData(gzip, new FileReference("ref"), "a", IOUtils.readFileBytes(file)); File downloadDir = new File(root, "ref"); assertEquals("1", IOUtils.readFile(new File(downloadDir, "a"))); assertEquals("2", IOUtils.readFile(new File(downloadDir, "b"))); + + tempFile = temporaryFolder.newFile(); + FileReferenceCompressor compressor = new FileReferenceCompressor(compressed, lz4); + file = compressor.compress(dirWithFiles, tempFile); + transferCompressedData(lz4, new FileReference("ref"), "a", IOUtils.readFileBytes(file)); + downloadDir = new File(root, "ref"); + assertEquals("1", IOUtils.readFile(new File(downloadDir, "a"))); + assertEquals("2", IOUtils.readFile(new File(downloadDir, "b"))); } private void transferPartsAndAssert(FileReference ref, String fileName, String all, int numParts) throws IOException { byte [] allContent = Utf8.toBytes(all); - FileReceiver.Session session = new FileReceiver.Session(root, 1, ref, - FileReferenceData.Type.file, fileName, allContent.length); + FileReceiver.Session session = new FileReceiver.Session(root, 1, ref, file, gzip, fileName, allContent.length); int partSize = (allContent.length+(numParts-1))/numParts; ByteBuffer bb = ByteBuffer.wrap(allContent); for (int i = 0, pos = 0; i < numParts; i++) { @@ -87,8 +98,8 @@ public class FileReceiverTest { assertEquals(all, Utf8.toString(allReadBytes)); } - private void transferCompressedData(FileReference ref, String fileName, byte[] data) { - FileReceiver.Session session = new FileReceiver.Session(root, 1, ref, compressed, fileName, data.length); + private void transferCompressedData(CompressionType compressionType, FileReference ref, String fileName, byte[] data) { + FileReceiver.Session session = new FileReceiver.Session(root, 1, ref, compressed, compressionType, fileName, data.length); session.addPart(0, data); session.close(hasher.hash(ByteBuffer.wrap(data), 0)); } |