aboutsummaryrefslogtreecommitdiffstats
path: root/filedistribution
diff options
context:
space:
mode:
authorHarald Musum <musum@yahooinc.com>2022-05-24 18:48:17 +0200
committerHarald Musum <musum@yahooinc.com>2022-05-24 18:48:17 +0200
commitfdd9e97885fe5d1546046b15fee7de47d1617941 (patch)
tree62b8a7f60605e0fff5cb61c5449e8b0a02c29680 /filedistribution
parentd6709d6a9e9d6739874f48ef56f7cb473bf5dfef (diff)
Prepare for different compression algorithms for distributed files
Diffstat (limited to 'filedistribution')
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java24
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceCompressor.java139
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/ZstdFileReference.java (renamed from filedistribution/src/main/java/com/yahoo/vespa/filedistribution/CompressedFileReference.java)13
-rw-r--r--filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java5
-rw-r--r--filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileReceiverTest.java11
5 files changed, 171 insertions, 21 deletions
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java
index 748d9eb1003..81a8944149c 100644
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java
@@ -9,7 +9,6 @@ import com.yahoo.jrt.Request;
import com.yahoo.jrt.Supervisor;
import net.jpountz.xxhash.StreamingXXHash64;
import net.jpountz.xxhash.XXHashFactory;
-
import java.io.File;
import java.io.IOException;
import java.nio.file.FileAlreadyExistsException;
@@ -23,6 +22,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
+import static com.yahoo.vespa.filedistribution.FileReferenceData.Type;
+
/**
* When asking for a file reference, this handles RPC callbacks from config server with file data and metadata.
* Uses the same Supervisor as the original caller that requests files, so communication uses the same
@@ -105,18 +106,12 @@ public class FileReceiver {
}
File close(long hash) {
- if (hasher.getValue() != hash)
- throw new RuntimeException("xxhash from content (" + currentHash + ") is not equal to xxhash in request (" + hash + ")");
+ verifyHash(hash);
File file = new File(fileReferenceDir, fileName);
File decompressedDir = null;
try {
- // Unpack if necessary
- if (fileType == FileReferenceData.Type.compressed) {
- decompressedDir = Files.createTempDirectory(tmpDir.toPath(), "archive").toFile();
- CompressedFileReference.decompress(inprogressFile, decompressedDir);
- moveFileToDestination(decompressedDir, fileReferenceDir);
- } else {
+ if (fileType == Type.file) {
try {
Files.createDirectories(fileReferenceDir.toPath());
} catch (IOException e) {
@@ -125,6 +120,10 @@ public class FileReceiver {
}
log.log(Level.FINE, () -> "Uncompressed file, moving to " + file.getAbsolutePath());
moveFileToDestination(inprogressFile, file);
+ } else {
+ decompressedDir = Files.createTempDirectory(tmpDir.toPath(), "archive").toFile();
+ new FileReferenceCompressor(fileType).decompress(inprogressFile, decompressedDir);
+ moveFileToDestination(decompressedDir, fileReferenceDir);
}
} catch (IOException e) {
log.log(Level.SEVERE, "Failed writing file: " + e.getMessage(), e);
@@ -139,6 +138,12 @@ public class FileReceiver {
double percentageReceived() {
return (double)currentFileSize/(double)fileSize;
}
+
+ void verifyHash(long hash) {
+ if (hasher.getValue() != hash)
+ throw new RuntimeException("xxhash from content (" + currentHash + ") is not equal to xxhash in request (" + hash + ")");
+ }
+
}
FileReceiver(Supervisor supervisor, Downloads downloads, File downloadDirectory) {
@@ -291,4 +296,5 @@ public class FileReceiver {
}
return 0;
}
+
}
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceCompressor.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceCompressor.java
new file mode 100644
index 00000000000..c36bcd22606
--- /dev/null
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceCompressor.java
@@ -0,0 +1,139 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.filedistribution;
+
+import com.google.common.io.ByteStreams;
+import org.apache.commons.compress.archivers.ArchiveEntry;
+import org.apache.commons.compress.archivers.ArchiveInputStream;
+import org.apache.commons.compress.archivers.ArchiveOutputStream;
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.Objects;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+/**
+ * Utility class for compressing and decompressing files used in a file reference
+ *
+ * @author hmusum
+ */
+public class FileReferenceCompressor {
+
+ private static final Logger log = Logger.getLogger(FileReferenceCompressor.class.getName());
+ private static final int recurseDepth = 100;
+
+ private final FileReferenceData.Type type;
+
+ public FileReferenceCompressor(FileReferenceData.Type type) {
+ this.type = Objects.requireNonNull(type, "Type cannot be null");
+ }
+
+ public File compress(File baseDir, List<File> inputFiles, File outputFile) throws IOException {
+ TarArchiveOutputStream archiveOutputStream = new TarArchiveOutputStream(compressedOutputStream(outputFile));
+ archiveOutputStream.setLongFileMode(TarArchiveOutputStream.LONGFILE_POSIX);
+ createArchiveFile(archiveOutputStream, baseDir, inputFiles);
+ return outputFile;
+ }
+
+ public File compress(File directory, File outputFile) throws IOException {
+ return compress(directory,
+ Files.find(Paths.get(directory.getAbsolutePath()),
+ recurseDepth,
+ (p, basicFileAttributes) -> basicFileAttributes.isRegularFile())
+ .map(Path::toFile).collect(Collectors.toList()),
+ outputFile);
+ }
+
+ public void decompress(File inputFile, File outputDir) throws IOException {
+ log.log(Level.FINE, () -> "Decompressing '" + inputFile + "' into '" + outputDir + "'");
+ try (ArchiveInputStream ais = new TarArchiveInputStream(decompressedInputStream(inputFile))) {
+ decompress(ais, outputDir);
+ } catch (IllegalArgumentException e) {
+ throw new RuntimeException("Unable to decompress '" + inputFile.getAbsolutePath() + "': " + e.getMessage());
+ }
+ }
+
+ private static void decompress(ArchiveInputStream archiveInputStream, File outputFile) throws IOException {
+ int entries = 0;
+ ArchiveEntry entry;
+ while ((entry = archiveInputStream.getNextEntry()) != null) {
+ File outFile = new File(outputFile, entry.getName());
+ if (entry.isDirectory()) {
+ if (!(outFile.exists() && outFile.isDirectory())) {
+ log.log(Level.FINE, () -> "Creating dir: " + outFile.getAbsolutePath());
+ if (!outFile.mkdirs()) {
+ log.log(Level.WARNING, "Could not create dir " + entry.getName());
+ }
+ }
+ } else {
+ // Create parent dir if necessary
+ File parent = new File(outFile.getParent());
+ if (!parent.exists() && !parent.mkdirs()) {
+ log.log(Level.WARNING, "Could not create dir " + parent.getAbsolutePath());
+ }
+ FileOutputStream fos = new FileOutputStream(outFile);
+ ByteStreams.copy(archiveInputStream, fos);
+ fos.close();
+ }
+ entries++;
+ }
+ if (entries == 0) {
+ throw new IllegalArgumentException("Not able to read any entries from stream (" +
+ archiveInputStream.getBytesRead() + " bytes read from stream)");
+ }
+ }
+
+ private static void createArchiveFile(ArchiveOutputStream archiveOutputStream, File baseDir, List<File> inputFiles) throws IOException {
+ inputFiles.forEach(file -> {
+ try {
+ writeFileToTar(archiveOutputStream, baseDir, file);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ archiveOutputStream.close();
+ }
+
+ private static void writeFileToTar(ArchiveOutputStream taos, File baseDir, File file) throws IOException {
+ log.log(Level.FINEST, () -> "Adding file to tar: " + baseDir.toPath().relativize(file.toPath()).toString());
+ taos.putArchiveEntry(taos.createArchiveEntry(file, baseDir.toPath().relativize(file.toPath()).toString()));
+ ByteStreams.copy(new FileInputStream(file), taos);
+ taos.closeArchiveEntry();
+ }
+
+ private OutputStream compressedOutputStream(File outputFile) throws IOException {
+ switch (type) {
+ case compressed:
+ return new GZIPOutputStream(new FileOutputStream(outputFile));
+ case file:
+ return new FileOutputStream(outputFile);
+ default:
+ throw new RuntimeException("Unknown file reference type " + type);
+ }
+ }
+
+ private InputStream decompressedInputStream(File inputFile) throws IOException {
+ switch (type) {
+ case compressed:
+ return new GZIPInputStream(new FileInputStream(inputFile));
+ case file:
+ return new FileInputStream(inputFile);
+ default:
+ throw new RuntimeException("Unknown file reference type " + type);
+ }
+ }
+
+}
+
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/CompressedFileReference.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/ZstdFileReference.java
index cbe810fe9b6..4e039bc56ab 100644
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/CompressedFileReference.java
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/ZstdFileReference.java
@@ -2,12 +2,12 @@
package com.yahoo.vespa.filedistribution;
import com.google.common.io.ByteStreams;
+import com.yahoo.compress.ZstdCompressor;
import org.apache.commons.compress.archivers.ArchiveEntry;
import org.apache.commons.compress.archivers.ArchiveInputStream;
import org.apache.commons.compress.archivers.ArchiveOutputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
-
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
@@ -28,15 +28,20 @@ import java.util.zip.GZIPOutputStream;
*
* @author hmusum
*/
-public class CompressedFileReference {
+public class ZstdFileReference {
- private static final Logger log = Logger.getLogger(CompressedFileReference.class.getName());
+ private static final Logger log = Logger.getLogger(ZstdFileReference.class.getName());
private static final int recurseDepth = 100;
public static File compress(File baseDir, List<File> inputFiles, File outputFile) throws IOException {
- TarArchiveOutputStream archiveOutputStream = new TarArchiveOutputStream(new GZIPOutputStream(new FileOutputStream(outputFile)));
+ // Files.createTempFile(, );
+ TarArchiveOutputStream archiveOutputStream = new TarArchiveOutputStream(new FileOutputStream(outputFile));
archiveOutputStream.setLongFileMode(TarArchiveOutputStream.LONGFILE_POSIX);
createArchiveFile(archiveOutputStream, baseDir, inputFiles);
+ try (FileOutputStream out = new FileOutputStream(outputFile);
+ FileInputStream in = new FileInputStream(outputFile)) {
+ out.write(new ZstdCompressor().compress(in.readAllBytes(), 0, 0));
+ }
return outputFile;
}
diff --git a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java
index e8bd63fc083..637fbbbd0a7 100644
--- a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java
+++ b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java
@@ -32,6 +32,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import static com.yahoo.jrt.ErrorCode.CONNECTION;
+import static com.yahoo.vespa.filedistribution.FileReferenceData.Type.compressed;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -147,9 +148,9 @@ public class FileDownloaderTest {
File barFile = new File(subdir, "really-long-filename-over-100-bytes-aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa");
IOUtils.writeFile(barFile, "bar", false);
- File tarFile = CompressedFileReference.compress(tempPath.toFile(), Arrays.asList(fooFile, barFile), new File(tempPath.toFile(), filename));
+ File tarFile = new FileReferenceCompressor(compressed).compress(tempPath.toFile(), Arrays.asList(fooFile, barFile), new File(tempPath.toFile(), filename));
byte[] tarredContent = IOUtils.readFileBytes(tarFile);
- receiveFile(fileReference, filename, FileReferenceData.Type.compressed, tarredContent);
+ receiveFile(fileReference, filename, compressed, tarredContent);
Optional<File> downloadedFile = getFile(fileReference);
assertTrue(downloadedFile.isPresent());
diff --git a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileReceiverTest.java b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileReceiverTest.java
index 996f3cc2984..5c15f945ae3 100644
--- a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileReceiverTest.java
+++ b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileReceiverTest.java
@@ -10,15 +10,15 @@ import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
-
-import static org.junit.Assert.assertEquals;
-
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
+import static com.yahoo.vespa.filedistribution.FileReferenceData.Type.compressed;
+import static org.junit.Assert.assertEquals;
+
public class FileReceiverTest {
private File root;
private final XXHash64 hasher = XXHashFactory.fastestInstance().hash64();
@@ -58,7 +58,7 @@ public class FileReceiverTest {
writerB.close();
File tempFile = temporaryFolder.newFile();
- File file = CompressedFileReference.compress(dirWithFiles, tempFile);
+ File file = new FileReferenceCompressor(compressed).compress(dirWithFiles, tempFile);
transferCompressedData(new FileReference("ref"), "a", IOUtils.readFileBytes(file));
File downloadDir = new File(root, "ref");
assertEquals("1", IOUtils.readFile(new File(downloadDir, "a")));
@@ -88,8 +88,7 @@ public class FileReceiverTest {
}
private void transferCompressedData(FileReference ref, String fileName, byte[] data) {
- FileReceiver.Session session =
- new FileReceiver.Session(root, 1, ref, FileReferenceData.Type.compressed, fileName, data.length);
+ FileReceiver.Session session = new FileReceiver.Session(root, 1, ref, compressed, fileName, data.length);
session.addPart(0, data);
session.close(hasher.hash(ByteBuffer.wrap(data), 0));
}