diff options
author | Harald Musum <musum@oath.com> | 2017-12-07 13:18:16 +0100 |
---|---|---|
committer | Harald Musum <musum@oath.com> | 2017-12-07 13:18:16 +0100 |
commit | 423a955e4a975d4f1dde550df2cea018882d4035 (patch) | |
tree | d73dcbd41b924c20e467f5983f42cd397567074e /filedistribution | |
parent | 56deb93f655849712b4cc17fc69df6331e0253a3 (diff) |
Handle that a file reference is a directory with many files
Compress on the fly if asked for such a file reference
Diffstat (limited to 'filedistribution')
7 files changed, 264 insertions, 46 deletions
diff --git a/filedistribution/pom.xml b/filedistribution/pom.xml index b0bdaff518f..d9699b700d0 100644 --- a/filedistribution/pom.xml +++ b/filedistribution/pom.xml @@ -54,6 +54,10 @@ <artifactId>junit</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-compress</artifactId> + </dependency> </dependencies> <build> diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/CompressedFileReference.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/CompressedFileReference.java new file mode 100644 index 00000000000..759a859253e --- /dev/null +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/CompressedFileReference.java @@ -0,0 +1,111 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.filedistribution; + +import com.google.common.io.ByteStreams; +import com.yahoo.log.LogLevel; +import org.apache.commons.compress.archivers.ArchiveEntry; +import org.apache.commons.compress.archivers.ArchiveInputStream; +import org.apache.commons.compress.archivers.ArchiveOutputStream; +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; +import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; +import java.util.logging.Logger; +import java.util.stream.Collectors; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +/** + * Utility class for compressing and decompressing files used in a file reference + * + * @author hmusum + */ +public class CompressedFileReference { + + private static final Logger log = Logger.getLogger(CompressedFileReference.class.getName()); + private static final int recurseDepth = 100; + + public static File compress(File baseDir, List<File> inputFiles, File outputFile) throws IOException { + ArchiveOutputStream archiveOutputStream = new TarArchiveOutputStream(new GZIPOutputStream(new FileOutputStream(outputFile))); + createArchiveFile(archiveOutputStream, baseDir, inputFiles); + return outputFile; + } + + public static byte[] compress(File directory) throws IOException { + return compress(directory, Files.find(Paths.get(directory.getAbsolutePath()), + recurseDepth, + (p, basicFileAttributes) -> basicFileAttributes.isRegularFile()) + .map(Path::toFile).collect(Collectors.toList())); + } + + public static byte[] compress(File baseDir, List<File> inputFiles) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + ArchiveOutputStream archiveOutputStream = new TarArchiveOutputStream(new GZIPOutputStream(out)); + createArchiveFile(archiveOutputStream, baseDir, inputFiles); + return out.toByteArray(); + } + + static void decompress(File inputFile, File outputDir) throws IOException { + log.log(LogLevel.DEBUG, "Decompressing '" + inputFile + "' into '" + outputDir + "'"); + ArchiveInputStream ais = new TarArchiveInputStream(new GZIPInputStream(new FileInputStream(inputFile))); + decompress(ais, outputDir); + ais.close(); + } + + private static void decompress(ArchiveInputStream archiveInputStream, File outputFile) throws IOException { + int entries = 0; + ArchiveEntry entry; + while ((entry = archiveInputStream.getNextEntry()) != null) { + log.log(LogLevel.DEBUG, "Unpacking " + entry.getName()); + File outFile = new File(outputFile, entry.getName()); + if (entry.isDirectory()) { + if (!(outFile.exists() && outFile.isDirectory())) { + log.log(LogLevel.DEBUG, "Creating dir: " + outFile.getAbsolutePath()); + if (!outFile.mkdirs()) { + log.log(LogLevel.WARNING, "Could not create dir " + entry.getName()); + } + } + } else { + // Create parent dir if necessary + File parent = new File(outFile.getParent()); + if (!parent.exists() && !parent.mkdirs()) { + log.log(LogLevel.WARNING, "Could not create dir " + parent.getAbsolutePath()); + } + FileOutputStream fos = new FileOutputStream(outFile); + ByteStreams.copy(archiveInputStream, fos); + fos.close(); + } + entries++; + } + if (entries == 0) { + log.log(LogLevel.WARNING, "Not able to read any entries from " + outputFile.getName()); + } + } + + private static void createArchiveFile(ArchiveOutputStream archiveOutputStream, File baseDir, List<File> inputFiles) throws IOException { + inputFiles.forEach(file -> { + try { + writeFileToTar(archiveOutputStream, baseDir, file); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + archiveOutputStream.close(); + } + + private static void writeFileToTar(ArchiveOutputStream taos, File baseDir, File file) throws IOException { + log.log(LogLevel.DEBUG, "Adding file to tar: " + baseDir.toPath().relativize(file.toPath()).toString()); + taos.putArchiveEntry(taos.createArchiveEntry(file, baseDir.toPath().relativize(file.toPath()).toString())); + ByteStreams.copy(new FileInputStream(file), taos); + taos.closeArchiveEntry(); + } +} + diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java index fde410bc8d7..9fe5eec54ff 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java @@ -16,7 +16,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -66,8 +65,8 @@ public class FileDownloader { fileReferences.forEach(this::queueForDownload); } - public void receiveFile(FileReference fileReference, String filename, byte[] content, long xxHash) { - fileReferenceDownloader.receiveFile(fileReference, filename, content, xxHash); + void receiveFile(FileReferenceData fileReferenceData) { + fileReferenceDownloader.receiveFile(fileReferenceData); } double downloadStatus(FileReference fileReference) { @@ -85,10 +84,6 @@ public class FileDownloader { private Optional<File> getFileFromFileSystem(FileReference fileReference, File directory) { File[] files = directory.listFiles(); if (directory.exists() && directory.isDirectory() && files != null && files.length > 0) { - if (files.length != 1) { - throw new RuntimeException("More than one file in '" + fileReference.value() + - "', expected only one, unable to proceed"); - } File file = files[0]; if (!file.exists()) { throw new RuntimeException("File with reference '" + fileReference.value() + diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java index 036c3157998..1ac3a1bd7df 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java @@ -1,4 +1,4 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.filedistribution; @@ -33,7 +33,7 @@ public class FileReceiver { private final File downloadDirectory; private final XXHash64 hasher = XXHashFactory.fastestInstance().hash64(); - public FileReceiver(Supervisor supervisor, FileReferenceDownloader downloader, File downloadDirectory) { + FileReceiver(Supervisor supervisor, FileReferenceDownloader downloader, File downloadDirectory) { this.supervisor = supervisor; this.downloader = downloader; this.downloadDirectory = downloadDirectory; @@ -68,14 +68,15 @@ public class FileReceiver { .paramDesc(4, "error-description", "Error description.") .returnDesc(0, "ret", "0 if success, 1 if crc mismatch, 2 otherwise")); // Temporary method until we have chunking - methods.add(new Method(RECEIVE_METHOD, "ssxlis", "i", handler, "receiveFile") + methods.add(new Method(RECEIVE_METHOD, "sssxlis", "i", handler, "receiveFile") .methodDesc("receive file reference content") .paramDesc(0, "file reference", "file reference to download") .paramDesc(1, "filename", "filename") - .paramDesc(2, "content", "array of bytes") - .paramDesc(3, "hash", "xx64hash of the file content") - .paramDesc(4, "errorcode", "Error code. 0 if none") - .paramDesc(5, "error-description", "Error description.") + .paramDesc(2, "type", "'file' or 'compressed'") + .paramDesc(3, "content", "array of bytes") + .paramDesc(4, "hash", "xx64hash of the file content") + .paramDesc(5, "errorcode", "Error code. 0 if none") + .paramDesc(6, "error-description", "Error description.") .returnDesc(0, "ret", "0 if success, 1 otherwise")); return methods; } @@ -84,15 +85,16 @@ public class FileReceiver { public final void receiveFile(Request req) { FileReference fileReference = new FileReference(req.parameters().get(0).asString()); String filename = req.parameters().get(1).asString(); - byte[] content = req.parameters().get(2).asData(); - long xxhash = req.parameters().get(3).asInt64(); - int errorCode = req.parameters().get(4).asInt32(); - String errorDescription = req.parameters().get(5).asString(); + String type = req.parameters().get(2).asString(); + byte[] content = req.parameters().get(3).asData(); + long xxhash = req.parameters().get(4).asInt64(); + int errorCode = req.parameters().get(5).asInt32(); + String errorDescription = req.parameters().get(6).asString(); if (errorCode == 0) { // TODO: Remove when system test works log.log(LogLevel.INFO, "Receiving file reference '" + fileReference.value() + "'"); - receiveFile(fileReference, filename, content, xxhash); + receiveFile(new FileReferenceData(fileReference, filename, FileReferenceData.Type.valueOf(type), content, xxhash)); req.returnValues().add(new Int32Value(0)); } else { log.log(LogLevel.WARNING, "Receiving file reference '" + fileReference.value() + "' failed: " + errorDescription); @@ -101,19 +103,30 @@ public class FileReceiver { } } - void receiveFile(FileReference fileReference, String filename, byte[] content, long xxHash) { - long xxHashFromContent = hasher.hash(ByteBuffer.wrap(content), 0); - if (xxHashFromContent != xxHash) - throw new RuntimeException("xxhash from content (" + xxHashFromContent + ") is not equal to xxhash in request (" + xxHash + ")"); + void receiveFile(FileReferenceData fileReferenceData) { + long xxHashFromContent = hasher.hash(ByteBuffer.wrap(fileReferenceData.content()), 0); + if (xxHashFromContent != fileReferenceData.xxhash()) + throw new RuntimeException("xxhash from content (" + xxHashFromContent + ") is not equal to xxhash in request (" + fileReferenceData.xxhash()+ ")"); - File fileReferenceDir = new File(downloadDirectory, fileReference.value()); + File fileReferenceDir = new File(downloadDirectory, fileReferenceData.fileReference().value()); + // file might be a directory (and then type is compressed) + File file = new File(fileReferenceDir, fileReferenceData.filename()); try { - File tempFile = new File(Files.createTempDirectory("downloaded").toFile(), filename); - Files.write(tempFile.toPath(), content); - Files.createDirectories(fileReferenceDir.toPath()); - File file = new File(fileReferenceDir, filename); - moveFileToDestination(tempFile, file); - downloader.completedDownloading(fileReference, file); + File tempFile = new File(Files.createTempDirectory("downloaded").toFile(), fileReferenceData.filename()); + Files.write(tempFile.toPath(), fileReferenceData.content()); + + // Unpack if necessary + if (fileReferenceData.type() == FileReferenceData.Type.compressed) { + File decompressedDir = Files.createTempDirectory("decompressed").toFile(); + log.log(LogLevel.DEBUG, "Compressed file, unpacking " + tempFile + " to " + decompressedDir); + CompressedFileReference.decompress(tempFile, decompressedDir); + moveFileToDestination(decompressedDir, fileReferenceDir); + } else { + log.log(LogLevel.DEBUG, "Uncompressed file, moving to " + file.getAbsolutePath()); + Files.createDirectories(fileReferenceDir.toPath()); + moveFileToDestination(tempFile, file); + } + downloader.completedDownloading(fileReferenceData.fileReference(), file); } catch (IOException e) { log.log(LogLevel.ERROR, "Failed writing file: " + e.getMessage(), e); throw new RuntimeException("Failed writing file: ", e); @@ -123,7 +136,7 @@ public class FileReceiver { private void moveFileToDestination(File tempFile, File destination) { try { Files.move(tempFile.toPath(), destination.toPath()); - log.log(LogLevel.INFO, "Data written to " + destination.getAbsolutePath()); + log.log(LogLevel.INFO, "File moved from " + tempFile.getAbsolutePath()+ " to " + destination.getAbsolutePath()); } catch (FileAlreadyExistsException e) { // Don't fail if it already exists (we might get the file from several config servers when retrying, servers are down etc. // so it might be written already) @@ -135,14 +148,16 @@ public class FileReceiver { } } - @SuppressWarnings({"UnusedDeclaration"}) + @SuppressWarnings({"UnusedDeclaration"}) public final void receiveFileMeta(Request req) { log.info("Received method call '" + req.methodName() + "' with parameters : " + req.parameters()); } + @SuppressWarnings({"UnusedDeclaration"}) public final void receiveFilePart(Request req) { log.info("Received method call '" + req.methodName() + "' with parameters : " + req.parameters()); } + @SuppressWarnings({"UnusedDeclaration"}) public final void receiveFileEof(Request req) { log.info("Received method call '" + req.methodName() + "' with parameters : " + req.parameters()); diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceData.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceData.java new file mode 100644 index 00000000000..6272390f5cb --- /dev/null +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceData.java @@ -0,0 +1,61 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.filedistribution; + +import com.yahoo.config.FileReference; +import net.jpountz.xxhash.XXHashFactory; + +import java.nio.ByteBuffer; + +/** + * Utility class for a file reference with data and metadata + * + * @author hmusum + */ +public class FileReferenceData { + + public enum Type {file, compressed} + + + + private final FileReference fileReference; + private final String filename; + private final Type type; + private final byte[] content; + private final long xxhash; + + public FileReferenceData(FileReference fileReference, String filename, Type type, byte[] content) { + this(fileReference, filename, type, content, XXHashFactory.fastestInstance().hash64().hash(ByteBuffer.wrap(content), 0)); + } + + public FileReferenceData(FileReference fileReference, String filename, Type type, byte[] content, long xxhash) { + this.fileReference = fileReference; + this.filename = filename; + this.type = type; + this.content = content; + this.xxhash = xxhash; + } + + public static FileReferenceData empty(FileReference fileReference, String filename) { + return new FileReferenceData(fileReference, filename, FileReferenceData.Type.file, new byte[0], 0); + } + + public FileReference fileReference() { + return fileReference; + } + + public String filename() { + return filename; + } + + public Type type() { + return type; + } + + public byte[] content() { + return content; + } + + public long xxhash() { + return xxhash; + } +} diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java index 4c9c37dd6da..b51a4b68212 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java @@ -89,8 +89,8 @@ public class FileReferenceDownloader { downloadQueue.add(fileReferenceDownload); } - void receiveFile(FileReference fileReference, String filename, byte[] content, long xxHash) { - fileReceiver.receiveFile(fileReference, filename, content, xxHash); + void receiveFile(FileReferenceData fileReferenceData) { + fileReceiver.receiveFile(fileReferenceData); } synchronized Set<FileReference> queuedDownloads() { diff --git a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java index 278c46dab8b..1c9e8cdb91b 100644 --- a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java +++ b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java @@ -13,15 +13,13 @@ import com.yahoo.jrt.Transport; import com.yahoo.text.Utf8; import com.yahoo.vespa.config.Connection; import com.yahoo.vespa.config.ConnectionPool; -import net.jpountz.xxhash.XXHash64; -import net.jpountz.xxhash.XXHashFactory; import org.junit.Before; import org.junit.Test; import java.io.File; import java.io.IOException; -import java.nio.ByteBuffer; import java.nio.file.Files; +import java.nio.file.Path; import java.time.Duration; import java.util.Arrays; import java.util.List; @@ -35,8 +33,6 @@ import static org.junit.Assert.fail; public class FileDownloaderTest { - private final XXHash64 hasher = XXHashFactory.fastestInstance().hash64(); - private MockConnection connection; private FileDownloader fileDownloader; private File downloadDir; @@ -102,7 +98,7 @@ public class FileDownloaderTest { // Receives fileReference, should return and make it available to caller String filename = "abc.jar"; - receiveFile(fileReference, filename, "some other content"); + receiveFile(fileReference, filename, FileReferenceData.Type.file, "some other content"); Optional<File> downloadedFile = fileDownloader.getFile(fileReference); assertTrue(downloadedFile.isPresent()); @@ -113,6 +109,40 @@ public class FileDownloaderTest { // Verify download status when downloaded assertDownloadStatus(fileDownloader, fileReference, 100.0); } + + { + // fileReference does not exist on disk, needs to be downloaded, is compressed data + + FileReference fileReference = new FileReference("fileReferenceToDirWithManyFiles"); + File fileReferenceFullPath = fileReferenceFullPath(downloadDir, fileReference); + assertFalse(fileReferenceFullPath.getAbsolutePath(), fileDownloader.getFile(fileReference).isPresent()); + + // Verify download status + assertDownloadStatus(fileDownloader, fileReference, 0.0); + + // Receives fileReference, should return and make it available to caller + String filename = "abc.tar.gz"; + Path tempPath = Files.createTempDirectory("dir"); + File subdir = new File(tempPath.toFile(), "subdir"); + File fooFile = new File(subdir, "foo"); + IOUtils.writeFile(fooFile, "foo", false); + File barFile = new File(subdir, "bar"); + IOUtils.writeFile(barFile, "bar", false); + + File tarFile = CompressedFileReference.compress(tempPath.toFile(), Arrays.asList(fooFile, barFile), new File(tempPath.toFile(), filename)); + byte[] tarredContent = IOUtils.readFileBytes(tarFile); + receiveFile(fileReference, filename, FileReferenceData.Type.compressed, tarredContent); + Optional<File> downloadedFile = fileDownloader.getFile(fileReference); + + assertTrue(downloadedFile.isPresent()); + File downloadedFoo = new File(fileReferenceFullPath, tempPath.relativize(fooFile.toPath()).toString()); + File downloadedBar = new File(fileReferenceFullPath, tempPath.relativize(barFile.toPath()).toString()); + assertEquals("foo", IOUtils.readFile(downloadedFoo)); + assertEquals("bar", IOUtils.readFile(downloadedBar)); + + // Verify download status when downloaded + assertDownloadStatus(fileDownloader, fileReference, 100.0); + } } @Test @@ -133,7 +163,7 @@ public class FileDownloaderTest { // Receives fileReference, should return and make it available to caller String filename = "abc.jar"; - receiveFile(fileReference, filename, "some other content"); + receiveFile(fileReference, filename, FileReferenceData.Type.file, "some other content"); Optional<File> downloadedFile = fileDownloader.getFile(fileReference); assertTrue(downloadedFile.isPresent()); @@ -168,7 +198,7 @@ public class FileDownloaderTest { public void receiveFile() throws IOException { FileReference foo = new FileReference("foo"); String filename = "foo.jar"; - receiveFile(foo, filename, "content"); + receiveFile(foo, filename, FileReferenceData.Type.file, "content"); File downloadedFile = new File(fileReferenceFullPath(downloadDir, foo), filename); assertEquals("content", IOUtils.readFile(downloadedFile)); } @@ -187,10 +217,12 @@ public class FileDownloaderTest { assertEquals(expectedDownloadStatus, downloadStatus, 0.0001); } - private void receiveFile(FileReference fileReference, String filename, String content) { - byte[] contentBytes = Utf8.toBytes(content); - long xxHashFromContent = hasher.hash(ByteBuffer.wrap(contentBytes), 0); - fileDownloader.receiveFile(fileReference, filename, contentBytes, xxHashFromContent); + private void receiveFile(FileReference fileReference, String filename, FileReferenceData.Type type, String content) { + receiveFile(fileReference, filename, type, Utf8.toBytes(content)); + } + + private void receiveFile(FileReference fileReference, String filename, FileReferenceData.Type type, byte[] content) { + fileDownloader.receiveFile(new FileReferenceData(fileReference, filename, type, content)); } private static class MockConnection implements ConnectionPool, com.yahoo.vespa.config.Connection { |