diff options
author | Harald Musum <musum@yahooinc.com> | 2022-05-24 18:48:17 +0200 |
---|---|---|
committer | Harald Musum <musum@yahooinc.com> | 2022-05-24 18:48:17 +0200 |
commit | fdd9e97885fe5d1546046b15fee7de47d1617941 (patch) | |
tree | 62b8a7f60605e0fff5cb61c5449e8b0a02c29680 /filedistribution | |
parent | d6709d6a9e9d6739874f48ef56f7cb473bf5dfef (diff) |
Prepare for different compression algorithms for distributed files
Diffstat (limited to 'filedistribution')
-rw-r--r-- | filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java | 24 | ||||
-rw-r--r-- | filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceCompressor.java | 139 | ||||
-rw-r--r-- | filedistribution/src/main/java/com/yahoo/vespa/filedistribution/ZstdFileReference.java (renamed from filedistribution/src/main/java/com/yahoo/vespa/filedistribution/CompressedFileReference.java) | 13 | ||||
-rw-r--r-- | filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java | 5 | ||||
-rw-r--r-- | filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileReceiverTest.java | 11 |
5 files changed, 171 insertions, 21 deletions
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java index 748d9eb1003..81a8944149c 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java @@ -9,7 +9,6 @@ import com.yahoo.jrt.Request; import com.yahoo.jrt.Supervisor; import net.jpountz.xxhash.StreamingXXHash64; import net.jpountz.xxhash.XXHashFactory; - import java.io.File; import java.io.IOException; import java.nio.file.FileAlreadyExistsException; @@ -23,6 +22,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; import java.util.logging.Logger; +import static com.yahoo.vespa.filedistribution.FileReferenceData.Type; + /** * When asking for a file reference, this handles RPC callbacks from config server with file data and metadata. * Uses the same Supervisor as the original caller that requests files, so communication uses the same @@ -105,18 +106,12 @@ public class FileReceiver { } File close(long hash) { - if (hasher.getValue() != hash) - throw new RuntimeException("xxhash from content (" + currentHash + ") is not equal to xxhash in request (" + hash + ")"); + verifyHash(hash); File file = new File(fileReferenceDir, fileName); File decompressedDir = null; try { - // Unpack if necessary - if (fileType == FileReferenceData.Type.compressed) { - decompressedDir = Files.createTempDirectory(tmpDir.toPath(), "archive").toFile(); - CompressedFileReference.decompress(inprogressFile, decompressedDir); - moveFileToDestination(decompressedDir, fileReferenceDir); - } else { + if (fileType == Type.file) { try { Files.createDirectories(fileReferenceDir.toPath()); } catch (IOException e) { @@ -125,6 +120,10 @@ public class FileReceiver { } log.log(Level.FINE, () -> "Uncompressed file, moving to " + file.getAbsolutePath()); moveFileToDestination(inprogressFile, file); + } else { + decompressedDir = Files.createTempDirectory(tmpDir.toPath(), "archive").toFile(); + new FileReferenceCompressor(fileType).decompress(inprogressFile, decompressedDir); + moveFileToDestination(decompressedDir, fileReferenceDir); } } catch (IOException e) { log.log(Level.SEVERE, "Failed writing file: " + e.getMessage(), e); @@ -139,6 +138,12 @@ public class FileReceiver { double percentageReceived() { return (double)currentFileSize/(double)fileSize; } + + void verifyHash(long hash) { + if (hasher.getValue() != hash) + throw new RuntimeException("xxhash from content (" + currentHash + ") is not equal to xxhash in request (" + hash + ")"); + } + } FileReceiver(Supervisor supervisor, Downloads downloads, File downloadDirectory) { @@ -291,4 +296,5 @@ public class FileReceiver { } return 0; } + } diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceCompressor.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceCompressor.java new file mode 100644 index 00000000000..c36bcd22606 --- /dev/null +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceCompressor.java @@ -0,0 +1,139 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.filedistribution; + +import com.google.common.io.ByteStreams; +import org.apache.commons.compress.archivers.ArchiveEntry; +import org.apache.commons.compress.archivers.ArchiveInputStream; +import org.apache.commons.compress.archivers.ArchiveOutputStream; +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; +import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; +import java.util.Objects; +import java.util.logging.Level; +import java.util.logging.Logger; +import java.util.stream.Collectors; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +/** + * Utility class for compressing and decompressing files used in a file reference + * + * @author hmusum + */ +public class FileReferenceCompressor { + + private static final Logger log = Logger.getLogger(FileReferenceCompressor.class.getName()); + private static final int recurseDepth = 100; + + private final FileReferenceData.Type type; + + public FileReferenceCompressor(FileReferenceData.Type type) { + this.type = Objects.requireNonNull(type, "Type cannot be null"); + } + + public File compress(File baseDir, List<File> inputFiles, File outputFile) throws IOException { + TarArchiveOutputStream archiveOutputStream = new TarArchiveOutputStream(compressedOutputStream(outputFile)); + archiveOutputStream.setLongFileMode(TarArchiveOutputStream.LONGFILE_POSIX); + createArchiveFile(archiveOutputStream, baseDir, inputFiles); + return outputFile; + } + + public File compress(File directory, File outputFile) throws IOException { + return compress(directory, + Files.find(Paths.get(directory.getAbsolutePath()), + recurseDepth, + (p, basicFileAttributes) -> basicFileAttributes.isRegularFile()) + .map(Path::toFile).collect(Collectors.toList()), + outputFile); + } + + public void decompress(File inputFile, File outputDir) throws IOException { + log.log(Level.FINE, () -> "Decompressing '" + inputFile + "' into '" + outputDir + "'"); + try (ArchiveInputStream ais = new TarArchiveInputStream(decompressedInputStream(inputFile))) { + decompress(ais, outputDir); + } catch (IllegalArgumentException e) { + throw new RuntimeException("Unable to decompress '" + inputFile.getAbsolutePath() + "': " + e.getMessage()); + } + } + + private static void decompress(ArchiveInputStream archiveInputStream, File outputFile) throws IOException { + int entries = 0; + ArchiveEntry entry; + while ((entry = archiveInputStream.getNextEntry()) != null) { + File outFile = new File(outputFile, entry.getName()); + if (entry.isDirectory()) { + if (!(outFile.exists() && outFile.isDirectory())) { + log.log(Level.FINE, () -> "Creating dir: " + outFile.getAbsolutePath()); + if (!outFile.mkdirs()) { + log.log(Level.WARNING, "Could not create dir " + entry.getName()); + } + } + } else { + // Create parent dir if necessary + File parent = new File(outFile.getParent()); + if (!parent.exists() && !parent.mkdirs()) { + log.log(Level.WARNING, "Could not create dir " + parent.getAbsolutePath()); + } + FileOutputStream fos = new FileOutputStream(outFile); + ByteStreams.copy(archiveInputStream, fos); + fos.close(); + } + entries++; + } + if (entries == 0) { + throw new IllegalArgumentException("Not able to read any entries from stream (" + + archiveInputStream.getBytesRead() + " bytes read from stream)"); + } + } + + private static void createArchiveFile(ArchiveOutputStream archiveOutputStream, File baseDir, List<File> inputFiles) throws IOException { + inputFiles.forEach(file -> { + try { + writeFileToTar(archiveOutputStream, baseDir, file); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + archiveOutputStream.close(); + } + + private static void writeFileToTar(ArchiveOutputStream taos, File baseDir, File file) throws IOException { + log.log(Level.FINEST, () -> "Adding file to tar: " + baseDir.toPath().relativize(file.toPath()).toString()); + taos.putArchiveEntry(taos.createArchiveEntry(file, baseDir.toPath().relativize(file.toPath()).toString())); + ByteStreams.copy(new FileInputStream(file), taos); + taos.closeArchiveEntry(); + } + + private OutputStream compressedOutputStream(File outputFile) throws IOException { + switch (type) { + case compressed: + return new GZIPOutputStream(new FileOutputStream(outputFile)); + case file: + return new FileOutputStream(outputFile); + default: + throw new RuntimeException("Unknown file reference type " + type); + } + } + + private InputStream decompressedInputStream(File inputFile) throws IOException { + switch (type) { + case compressed: + return new GZIPInputStream(new FileInputStream(inputFile)); + case file: + return new FileInputStream(inputFile); + default: + throw new RuntimeException("Unknown file reference type " + type); + } + } + +} + diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/CompressedFileReference.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/ZstdFileReference.java index cbe810fe9b6..4e039bc56ab 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/CompressedFileReference.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/ZstdFileReference.java @@ -2,12 +2,12 @@ package com.yahoo.vespa.filedistribution; import com.google.common.io.ByteStreams; +import com.yahoo.compress.ZstdCompressor; import org.apache.commons.compress.archivers.ArchiveEntry; import org.apache.commons.compress.archivers.ArchiveInputStream; import org.apache.commons.compress.archivers.ArchiveOutputStream; import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; - import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileInputStream; @@ -28,15 +28,20 @@ import java.util.zip.GZIPOutputStream; * * @author hmusum */ -public class CompressedFileReference { +public class ZstdFileReference { - private static final Logger log = Logger.getLogger(CompressedFileReference.class.getName()); + private static final Logger log = Logger.getLogger(ZstdFileReference.class.getName()); private static final int recurseDepth = 100; public static File compress(File baseDir, List<File> inputFiles, File outputFile) throws IOException { - TarArchiveOutputStream archiveOutputStream = new TarArchiveOutputStream(new GZIPOutputStream(new FileOutputStream(outputFile))); + // Files.createTempFile(, ); + TarArchiveOutputStream archiveOutputStream = new TarArchiveOutputStream(new FileOutputStream(outputFile)); archiveOutputStream.setLongFileMode(TarArchiveOutputStream.LONGFILE_POSIX); createArchiveFile(archiveOutputStream, baseDir, inputFiles); + try (FileOutputStream out = new FileOutputStream(outputFile); + FileInputStream in = new FileInputStream(outputFile)) { + out.write(new ZstdCompressor().compress(in.readAllBytes(), 0, 0)); + } return outputFile; } diff --git a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java index e8bd63fc083..637fbbbd0a7 100644 --- a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java +++ b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java @@ -32,6 +32,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import static com.yahoo.jrt.ErrorCode.CONNECTION; +import static com.yahoo.vespa.filedistribution.FileReferenceData.Type.compressed; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -147,9 +148,9 @@ public class FileDownloaderTest { File barFile = new File(subdir, "really-long-filename-over-100-bytes-aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"); IOUtils.writeFile(barFile, "bar", false); - File tarFile = CompressedFileReference.compress(tempPath.toFile(), Arrays.asList(fooFile, barFile), new File(tempPath.toFile(), filename)); + File tarFile = new FileReferenceCompressor(compressed).compress(tempPath.toFile(), Arrays.asList(fooFile, barFile), new File(tempPath.toFile(), filename)); byte[] tarredContent = IOUtils.readFileBytes(tarFile); - receiveFile(fileReference, filename, FileReferenceData.Type.compressed, tarredContent); + receiveFile(fileReference, filename, compressed, tarredContent); Optional<File> downloadedFile = getFile(fileReference); assertTrue(downloadedFile.isPresent()); diff --git a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileReceiverTest.java b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileReceiverTest.java index 996f3cc2984..5c15f945ae3 100644 --- a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileReceiverTest.java +++ b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileReceiverTest.java @@ -10,15 +10,15 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; - -import static org.junit.Assert.assertEquals; - import java.io.File; import java.io.FileWriter; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.file.Files; +import static com.yahoo.vespa.filedistribution.FileReferenceData.Type.compressed; +import static org.junit.Assert.assertEquals; + public class FileReceiverTest { private File root; private final XXHash64 hasher = XXHashFactory.fastestInstance().hash64(); @@ -58,7 +58,7 @@ public class FileReceiverTest { writerB.close(); File tempFile = temporaryFolder.newFile(); - File file = CompressedFileReference.compress(dirWithFiles, tempFile); + File file = new FileReferenceCompressor(compressed).compress(dirWithFiles, tempFile); transferCompressedData(new FileReference("ref"), "a", IOUtils.readFileBytes(file)); File downloadDir = new File(root, "ref"); assertEquals("1", IOUtils.readFile(new File(downloadDir, "a"))); @@ -88,8 +88,7 @@ public class FileReceiverTest { } private void transferCompressedData(FileReference ref, String fileName, byte[] data) { - FileReceiver.Session session = - new FileReceiver.Session(root, 1, ref, FileReferenceData.Type.compressed, fileName, data.length); + FileReceiver.Session session = new FileReceiver.Session(root, 1, ref, compressed, fileName, data.length); session.addPart(0, data); session.close(hasher.hash(ByteBuffer.wrap(data), 0)); } |