diff options
author | Harald Musum <musum@yahooinc.com> | 2022-05-24 18:48:17 +0200 |
---|---|---|
committer | Harald Musum <musum@yahooinc.com> | 2022-05-24 18:48:17 +0200 |
commit | fdd9e97885fe5d1546046b15fee7de47d1617941 (patch) | |
tree | 62b8a7f60605e0fff5cb61c5449e8b0a02c29680 /filedistribution/src/main/java/com/yahoo/vespa | |
parent | d6709d6a9e9d6739874f48ef56f7cb473bf5dfef (diff) |
Prepare for different compression algorithms for distributed files
Diffstat (limited to 'filedistribution/src/main/java/com/yahoo/vespa')
-rw-r--r-- | filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java | 24 | ||||
-rw-r--r-- | filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceCompressor.java | 139 | ||||
-rw-r--r-- | filedistribution/src/main/java/com/yahoo/vespa/filedistribution/ZstdFileReference.java (renamed from filedistribution/src/main/java/com/yahoo/vespa/filedistribution/CompressedFileReference.java) | 13 |
3 files changed, 163 insertions, 13 deletions
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java index 748d9eb1003..81a8944149c 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java @@ -9,7 +9,6 @@ import com.yahoo.jrt.Request; import com.yahoo.jrt.Supervisor; import net.jpountz.xxhash.StreamingXXHash64; import net.jpountz.xxhash.XXHashFactory; - import java.io.File; import java.io.IOException; import java.nio.file.FileAlreadyExistsException; @@ -23,6 +22,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; import java.util.logging.Logger; +import static com.yahoo.vespa.filedistribution.FileReferenceData.Type; + /** * When asking for a file reference, this handles RPC callbacks from config server with file data and metadata. * Uses the same Supervisor as the original caller that requests files, so communication uses the same @@ -105,18 +106,12 @@ public class FileReceiver { } File close(long hash) { - if (hasher.getValue() != hash) - throw new RuntimeException("xxhash from content (" + currentHash + ") is not equal to xxhash in request (" + hash + ")"); + verifyHash(hash); File file = new File(fileReferenceDir, fileName); File decompressedDir = null; try { - // Unpack if necessary - if (fileType == FileReferenceData.Type.compressed) { - decompressedDir = Files.createTempDirectory(tmpDir.toPath(), "archive").toFile(); - CompressedFileReference.decompress(inprogressFile, decompressedDir); - moveFileToDestination(decompressedDir, fileReferenceDir); - } else { + if (fileType == Type.file) { try { Files.createDirectories(fileReferenceDir.toPath()); } catch (IOException e) { @@ -125,6 +120,10 @@ public class FileReceiver { } log.log(Level.FINE, () -> "Uncompressed file, moving to " + file.getAbsolutePath()); moveFileToDestination(inprogressFile, file); + } else { + decompressedDir = Files.createTempDirectory(tmpDir.toPath(), "archive").toFile(); + new FileReferenceCompressor(fileType).decompress(inprogressFile, decompressedDir); + moveFileToDestination(decompressedDir, fileReferenceDir); } } catch (IOException e) { log.log(Level.SEVERE, "Failed writing file: " + e.getMessage(), e); @@ -139,6 +138,12 @@ public class FileReceiver { double percentageReceived() { return (double)currentFileSize/(double)fileSize; } + + void verifyHash(long hash) { + if (hasher.getValue() != hash) + throw new RuntimeException("xxhash from content (" + currentHash + ") is not equal to xxhash in request (" + hash + ")"); + } + } FileReceiver(Supervisor supervisor, Downloads downloads, File downloadDirectory) { @@ -291,4 +296,5 @@ public class FileReceiver { } return 0; } + } diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceCompressor.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceCompressor.java new file mode 100644 index 00000000000..c36bcd22606 --- /dev/null +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceCompressor.java @@ -0,0 +1,139 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.filedistribution; + +import com.google.common.io.ByteStreams; +import org.apache.commons.compress.archivers.ArchiveEntry; +import org.apache.commons.compress.archivers.ArchiveInputStream; +import org.apache.commons.compress.archivers.ArchiveOutputStream; +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; +import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; +import java.util.Objects; +import java.util.logging.Level; +import java.util.logging.Logger; +import java.util.stream.Collectors; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +/** + * Utility class for compressing and decompressing files used in a file reference + * + * @author hmusum + */ +public class FileReferenceCompressor { + + private static final Logger log = Logger.getLogger(FileReferenceCompressor.class.getName()); + private static final int recurseDepth = 100; + + private final FileReferenceData.Type type; + + public FileReferenceCompressor(FileReferenceData.Type type) { + this.type = Objects.requireNonNull(type, "Type cannot be null"); + } + + public File compress(File baseDir, List<File> inputFiles, File outputFile) throws IOException { + TarArchiveOutputStream archiveOutputStream = new TarArchiveOutputStream(compressedOutputStream(outputFile)); + archiveOutputStream.setLongFileMode(TarArchiveOutputStream.LONGFILE_POSIX); + createArchiveFile(archiveOutputStream, baseDir, inputFiles); + return outputFile; + } + + public File compress(File directory, File outputFile) throws IOException { + return compress(directory, + Files.find(Paths.get(directory.getAbsolutePath()), + recurseDepth, + (p, basicFileAttributes) -> basicFileAttributes.isRegularFile()) + .map(Path::toFile).collect(Collectors.toList()), + outputFile); + } + + public void decompress(File inputFile, File outputDir) throws IOException { + log.log(Level.FINE, () -> "Decompressing '" + inputFile + "' into '" + outputDir + "'"); + try (ArchiveInputStream ais = new TarArchiveInputStream(decompressedInputStream(inputFile))) { + decompress(ais, outputDir); + } catch (IllegalArgumentException e) { + throw new RuntimeException("Unable to decompress '" + inputFile.getAbsolutePath() + "': " + e.getMessage()); + } + } + + private static void decompress(ArchiveInputStream archiveInputStream, File outputFile) throws IOException { + int entries = 0; + ArchiveEntry entry; + while ((entry = archiveInputStream.getNextEntry()) != null) { + File outFile = new File(outputFile, entry.getName()); + if (entry.isDirectory()) { + if (!(outFile.exists() && outFile.isDirectory())) { + log.log(Level.FINE, () -> "Creating dir: " + outFile.getAbsolutePath()); + if (!outFile.mkdirs()) { + log.log(Level.WARNING, "Could not create dir " + entry.getName()); + } + } + } else { + // Create parent dir if necessary + File parent = new File(outFile.getParent()); + if (!parent.exists() && !parent.mkdirs()) { + log.log(Level.WARNING, "Could not create dir " + parent.getAbsolutePath()); + } + FileOutputStream fos = new FileOutputStream(outFile); + ByteStreams.copy(archiveInputStream, fos); + fos.close(); + } + entries++; + } + if (entries == 0) { + throw new IllegalArgumentException("Not able to read any entries from stream (" + + archiveInputStream.getBytesRead() + " bytes read from stream)"); + } + } + + private static void createArchiveFile(ArchiveOutputStream archiveOutputStream, File baseDir, List<File> inputFiles) throws IOException { + inputFiles.forEach(file -> { + try { + writeFileToTar(archiveOutputStream, baseDir, file); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + archiveOutputStream.close(); + } + + private static void writeFileToTar(ArchiveOutputStream taos, File baseDir, File file) throws IOException { + log.log(Level.FINEST, () -> "Adding file to tar: " + baseDir.toPath().relativize(file.toPath()).toString()); + taos.putArchiveEntry(taos.createArchiveEntry(file, baseDir.toPath().relativize(file.toPath()).toString())); + ByteStreams.copy(new FileInputStream(file), taos); + taos.closeArchiveEntry(); + } + + private OutputStream compressedOutputStream(File outputFile) throws IOException { + switch (type) { + case compressed: + return new GZIPOutputStream(new FileOutputStream(outputFile)); + case file: + return new FileOutputStream(outputFile); + default: + throw new RuntimeException("Unknown file reference type " + type); + } + } + + private InputStream decompressedInputStream(File inputFile) throws IOException { + switch (type) { + case compressed: + return new GZIPInputStream(new FileInputStream(inputFile)); + case file: + return new FileInputStream(inputFile); + default: + throw new RuntimeException("Unknown file reference type " + type); + } + } + +} + diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/CompressedFileReference.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/ZstdFileReference.java index cbe810fe9b6..4e039bc56ab 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/CompressedFileReference.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/ZstdFileReference.java @@ -2,12 +2,12 @@ package com.yahoo.vespa.filedistribution; import com.google.common.io.ByteStreams; +import com.yahoo.compress.ZstdCompressor; import org.apache.commons.compress.archivers.ArchiveEntry; import org.apache.commons.compress.archivers.ArchiveInputStream; import org.apache.commons.compress.archivers.ArchiveOutputStream; import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; - import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileInputStream; @@ -28,15 +28,20 @@ import java.util.zip.GZIPOutputStream; * * @author hmusum */ -public class CompressedFileReference { +public class ZstdFileReference { - private static final Logger log = Logger.getLogger(CompressedFileReference.class.getName()); + private static final Logger log = Logger.getLogger(ZstdFileReference.class.getName()); private static final int recurseDepth = 100; public static File compress(File baseDir, List<File> inputFiles, File outputFile) throws IOException { - TarArchiveOutputStream archiveOutputStream = new TarArchiveOutputStream(new GZIPOutputStream(new FileOutputStream(outputFile))); + // Files.createTempFile(, ); + TarArchiveOutputStream archiveOutputStream = new TarArchiveOutputStream(new FileOutputStream(outputFile)); archiveOutputStream.setLongFileMode(TarArchiveOutputStream.LONGFILE_POSIX); createArchiveFile(archiveOutputStream, baseDir, inputFiles); + try (FileOutputStream out = new FileOutputStream(outputFile); + FileInputStream in = new FileInputStream(outputFile)) { + out.write(new ZstdCompressor().compress(in.readAllBytes(), 0, 0)); + } return outputFile; } |