From 423a955e4a975d4f1dde550df2cea018882d4035 Mon Sep 17 00:00:00 2001 From: Harald Musum Date: Thu, 7 Dec 2017 13:18:16 +0100 Subject: Handle that a file reference is a directory with many files Compress on the fly if asked for such a file reference --- .../server/filedistribution/FileDirectory.java | 29 +++--- .../config/server/filedistribution/FileServer.java | 34 +++++-- .../yahoo/vespa/config/server/rpc/RpcServer.java | 20 ++-- .../server/filedistribution/FileDirectoryTest.java | 69 +++++++++++++ .../server/filedistribution/FileServerTest.java | 31 ++++-- filedistribution/pom.xml | 4 + .../filedistribution/CompressedFileReference.java | 111 +++++++++++++++++++++ .../vespa/filedistribution/FileDownloader.java | 9 +- .../yahoo/vespa/filedistribution/FileReceiver.java | 65 +++++++----- .../vespa/filedistribution/FileReferenceData.java | 61 +++++++++++ .../filedistribution/FileReferenceDownloader.java | 4 +- .../vespa/filedistribution/FileDownloaderTest.java | 56 ++++++++--- 12 files changed, 403 insertions(+), 90 deletions(-) create mode 100644 configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileDirectoryTest.java create mode 100644 filedistribution/src/main/java/com/yahoo/vespa/filedistribution/CompressedFileReference.java create mode 100644 filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceData.java diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDirectory.java b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDirectory.java index d8a560c6159..e991341b616 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDirectory.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDirectory.java @@ -35,7 +35,7 @@ public class FileDirectory { try { ensureRootExist(); } catch (IllegalArgumentException e) { - log.warning("Failed creating directory in constructor, will retry on demand : " + e.toString()); + log.log(LogLevel.WARNING, "Failed creating directory in constructor, will retry on demand : " + e.toString()); } } @@ -70,12 +70,8 @@ public class FileDirectory { throw new IllegalArgumentException("File reference '" + reference.toString() + "' with absolute path '" + dir.getAbsolutePath() + "' is not a directory."); } File [] files = dir.listFiles(new Filter()); - if (files.length != 1) { - StringBuilder msg = new StringBuilder(); - for (File f: files) { - msg.append(f.getName()).append("\n"); - } - throw new IllegalArgumentException("File reference '" + reference.toString() + "' with absolute path '" + dir.getAbsolutePath() + " does not contain exactly one file, but [" + msg.toString() + "]"); + if (files == null || files.length == 0) { + throw new IllegalArgumentException("File reference '" + reference.toString() + "' with absolute path '" + dir.getAbsolutePath() + " does not contain any files"); } return files[0]; } @@ -96,25 +92,28 @@ public class FileDirectory { } } - public FileReference addFile(File source, FileReference reference) { + FileReference addFile(File source, FileReference reference) { ensureRootExist(); try { logfileInfo(source); File destinationDir = new File(root, reference.value()); + Path tempDestinationDir = Files.createTempDirectory(root.toPath(), "writing"); + File destination = new File(tempDestinationDir.toFile(), source.getName()); if (!destinationDir.exists()) { destinationDir.mkdir(); - Path tempDestinationDir = Files.createTempDirectory(root.toPath(), "writing"); - File destination = new File(tempDestinationDir.toFile(), source.getName()); - if (source.isDirectory()) - IOUtils.copyDirectory(source, destination); - else + log.log(LogLevel.DEBUG, "file reference ' " + reference.value() + "', source: " + source.getAbsolutePath() ); + if (source.isDirectory()) { + log.log(LogLevel.DEBUG, "Copying source " + source.getAbsolutePath() + " to " + destination.getAbsolutePath()); + IOUtils.copyDirectory(source, destination, -1); + } else copyFile(source, destination); if (!destinationDir.exists()) { + log.log(LogLevel.DEBUG, "Moving from " + tempDestinationDir + " to " + destinationDir.getAbsolutePath()); if ( ! tempDestinationDir.toFile().renameTo(destinationDir)) { - log.warning("Failed moving '" + tempDestinationDir.toFile().getAbsolutePath() + "' to '" + destination.getAbsolutePath() + "'."); + log.log(LogLevel.WARNING, "Failed moving '" + tempDestinationDir.toFile().getAbsolutePath() + "' to '" + destination.getAbsolutePath() + "'."); } } else { - IOUtils.copyDirectory(tempDestinationDir.toFile(), destinationDir, 1); + IOUtils.copyDirectory(tempDestinationDir.toFile(), destinationDir, -1); } IOUtils.recursiveDeleteDir(tempDestinationDir.toFile()); } diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java index 9316a9a5c8e..958f26632ef 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java @@ -14,7 +14,9 @@ import com.yahoo.vespa.config.Connection; import com.yahoo.vespa.config.ConnectionPool; import com.yahoo.vespa.config.JRTConnectionPool; import com.yahoo.vespa.config.server.ConfigServerSpec; +import com.yahoo.vespa.filedistribution.CompressedFileReference; import com.yahoo.vespa.filedistribution.FileDownloader; +import com.yahoo.vespa.filedistribution.FileReferenceData; import java.io.File; import java.io.IOException; @@ -26,6 +28,7 @@ import java.util.stream.Collectors; public class FileServer { private static final Logger log = Logger.getLogger(FileServer.class.getName()); + private final FileDirectory root; private final ExecutorService executor; private final FileDownloader downloader; @@ -43,7 +46,7 @@ public class FileServer { } public interface Receiver { - void receive(FileReference reference, String filename, byte [] content, ReplayStatus status); + void receive(FileReferenceData fileData, ReplayStatus status); } @Inject @@ -87,22 +90,39 @@ public class FileServer { File file = root.getFile(reference); // TODO remove once verified in system tests. log.info("Start serving reference '" + reference.value() + "' with file '" + file.getAbsolutePath() + "'"); - byte [] blob = new byte [0]; boolean success = false; String errorDescription = "OK"; + FileReferenceData fileData = FileReferenceData.empty(reference, file.getName()); try { - blob = IOUtils.readFileBytes(file); + fileData = readFileReferenceData(reference); success = true; } catch (IOException e) { - errorDescription = "For file reference '" + reference.value() + "' I failed reading file '" + file.getAbsolutePath() + "'"; - log.warning(errorDescription + "for sending to '" + target.toString() + "'. " + e.toString()); + errorDescription = "For file reference '" + reference.value() + "': failed reading file '" + file.getAbsolutePath() + "'"; + log.warning(errorDescription + " for sending to '" + target.toString() + "'. " + e.toString()); } - target.receive(reference, file.getName(), blob, - new ReplayStatus(success ? 0 : 1, success ? "OK" : errorDescription)); + + target.receive(fileData, new ReplayStatus(success ? 0 : 1, success ? "OK" : errorDescription)); // TODO remove once verified in system tests. log.info("Done serving reference '" + reference.toString() + "' with file '" + file.getAbsolutePath() + "'"); } + + private FileReferenceData readFileReferenceData(FileReference reference) throws IOException { + File file = root.getFile(reference); + + byte[] blob; + FileReferenceData.Type type; + if (file.isDirectory()) { + type = FileReferenceData.Type.compressed; + blob = CompressedFileReference.compress(file.getParentFile()); + } else { + type = FileReferenceData.Type.file; + blob = IOUtils.readFileBytes(file); + } + + return new FileReferenceData(reference, file.getName(), type, blob); + } + public void download(FileReference fileReference) { downloader.getFile(fileReference); } diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java index d17cdf722ea..5c50fbfc31b 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java @@ -41,10 +41,8 @@ import com.yahoo.vespa.config.server.monitoring.MetricUpdaterFactory; import com.yahoo.vespa.config.server.tenant.TenantHandlerProvider; import com.yahoo.vespa.config.server.tenant.TenantListener; import com.yahoo.vespa.config.server.tenant.Tenants; -import net.jpountz.xxhash.XXHash64; -import net.jpountz.xxhash.XXHashFactory; +import com.yahoo.vespa.filedistribution.FileReferenceData; -import java.nio.ByteBuffer; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -438,19 +436,19 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { } @Override - public void receive(FileReference reference, String filename, byte [] content, FileServer.ReplayStatus status) { - XXHash64 hasher = XXHashFactory.fastestInstance().hash64(); + public void receive(FileReferenceData fileData, FileServer.ReplayStatus status) { Request fileBlob = new Request("filedistribution.receiveFile"); - fileBlob.parameters().add(new StringValue(reference.value())); - fileBlob.parameters().add(new StringValue(filename)); - fileBlob.parameters().add(new DataValue(content)); - fileBlob.parameters().add(new Int64Value(hasher.hash(ByteBuffer.wrap(content), 0))); + fileBlob.parameters().add(new StringValue(fileData.fileReference().value())); + fileBlob.parameters().add(new StringValue(fileData.filename())); + fileBlob.parameters().add(new StringValue(fileData.type().name())); + fileBlob.parameters().add(new DataValue(fileData.content())); + fileBlob.parameters().add(new Int64Value(fileData.xxhash())); fileBlob.parameters().add(new Int32Value(status.getCode())); fileBlob.parameters().add(new StringValue(status.getDescription())); target.invokeSync(fileBlob, 600); if (fileBlob.isError()) { - log.warning("Failed delivering reference '" + reference.value() + "' with file '" + filename + "' to " + - target.toString() + " with error : '" + fileBlob.errorMessage() + "'."); + log.warning("Failed delivering reference '" + fileData.fileReference().value() + "' with file '" + fileData.filename() + "' to " + + target.toString() + " with error: '" + fileBlob.errorMessage() + "'."); } } } diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileDirectoryTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileDirectoryTest.java new file mode 100644 index 00000000000..ad807f9527f --- /dev/null +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileDirectoryTest.java @@ -0,0 +1,69 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +package com.yahoo.vespa.config.server.filedistribution; + +import com.yahoo.config.FileReference; +import com.yahoo.io.IOUtils; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; + +import static org.junit.Assert.assertTrue; + +public class FileDirectoryTest { + + private FileDirectory fileDirectory; + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Before + public void setup() { + fileDirectory = new FileDirectory(temporaryFolder.getRoot()); + } + + @Test + public void requireThatFileReferenceWithFilesWorks() throws IOException { + FileReference foo = createFile("foo"); + FileReference bar = createFile("bar"); + + assertTrue(fileDirectory.getFile(foo).exists()); + assertTrue(fileDirectory.getFile(bar).exists()); + } + + + @Test + public void requireThatFileReferenceWithSubDirectoriesWorks() throws IOException { + FileDirectory fileDirectory = new FileDirectory(temporaryFolder.getRoot()); + + FileReference foo = createFileInSubDir("subdir", "foo"); + FileReference bar = createFileInSubDir("subdir", "bar"); + + assertTrue(fileDirectory.getFile(foo).exists()); + assertTrue(fileDirectory.getFile(bar).exists()); + } + + // Content in created file is equal to the filename string + private FileReference createFile(String filename) throws IOException { + File file = temporaryFolder.newFile(filename); + IOUtils.writeFile(file, filename, false); + return fileDirectory.addFile(file); + } + + private FileReference createFileInSubDir(String subdirName, String filename) throws IOException { + File subDirectory = new File(temporaryFolder.getRoot(), subdirName); + if (!subDirectory.exists()) + subDirectory.mkdirs(); + File file = new File(subDirectory, filename); + IOUtils.writeFile(file, filename, false); + return fileDirectory.addFile(file); + } + + +} + + diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java index 09260987ac0..5fcaee6e590 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java @@ -2,9 +2,9 @@ package com.yahoo.vespa.config.server.filedistribution; import com.yahoo.cloud.config.ConfigserverConfig; -import com.yahoo.config.FileReference; import com.yahoo.io.IOUtils; import com.yahoo.net.HostName; +import com.yahoo.vespa.filedistribution.FileReferenceData; import org.junit.Test; import java.io.File; @@ -35,7 +35,7 @@ public class FileServerTest { } @Test - public void requireThatExistingFileCanbeFound() throws IOException { + public void requireThatExistingFileCanBeFound() throws IOException { createCleanDir("123"); IOUtils.writeFile("123/f1", "test", true); assertTrue(fs.hasFile("123")); @@ -50,15 +50,13 @@ public class FileServerTest { cleanup(); } - private static class FileReceiver implements FileServer.Receiver { - CompletableFuture content; - FileReceiver(CompletableFuture content) { - this.content = content; - } - @Override - public void receive(FileReference reference, String filename, byte[] content, FileServer.ReplayStatus status) { - this.content.complete(content); - } + @Test + public void requireThatFileReferenceWithDirectoryCanBeFound() throws IOException { + createCleanDir("124/subdir"); + IOUtils.writeFile("124/subdir/f1", "test", false); + IOUtils.writeFile("124/subdir/f2", "test", false); + assertTrue(fs.hasFile("124/subdir")); + cleanup(); } @Test @@ -98,6 +96,17 @@ public class FileServerTest { assertEquals(1, fileServer.downloader().fileReferenceDownloader().connectionPool().getSize()); } + private static class FileReceiver implements FileServer.Receiver { + CompletableFuture content; + FileReceiver(CompletableFuture content) { + this.content = content; + } + @Override + public void receive(FileReferenceData fileData, FileServer.ReplayStatus status) { + this.content.complete(fileData.content()); + } + } + private void cleanup() { created.forEach((file) -> IOUtils.recursiveDeleteDir(file)); created.clear(); diff --git a/filedistribution/pom.xml b/filedistribution/pom.xml index b0bdaff518f..d9699b700d0 100644 --- a/filedistribution/pom.xml +++ b/filedistribution/pom.xml @@ -54,6 +54,10 @@ junit test + + org.apache.commons + commons-compress + diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/CompressedFileReference.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/CompressedFileReference.java new file mode 100644 index 00000000000..759a859253e --- /dev/null +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/CompressedFileReference.java @@ -0,0 +1,111 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.filedistribution; + +import com.google.common.io.ByteStreams; +import com.yahoo.log.LogLevel; +import org.apache.commons.compress.archivers.ArchiveEntry; +import org.apache.commons.compress.archivers.ArchiveInputStream; +import org.apache.commons.compress.archivers.ArchiveOutputStream; +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; +import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; +import java.util.logging.Logger; +import java.util.stream.Collectors; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +/** + * Utility class for compressing and decompressing files used in a file reference + * + * @author hmusum + */ +public class CompressedFileReference { + + private static final Logger log = Logger.getLogger(CompressedFileReference.class.getName()); + private static final int recurseDepth = 100; + + public static File compress(File baseDir, List inputFiles, File outputFile) throws IOException { + ArchiveOutputStream archiveOutputStream = new TarArchiveOutputStream(new GZIPOutputStream(new FileOutputStream(outputFile))); + createArchiveFile(archiveOutputStream, baseDir, inputFiles); + return outputFile; + } + + public static byte[] compress(File directory) throws IOException { + return compress(directory, Files.find(Paths.get(directory.getAbsolutePath()), + recurseDepth, + (p, basicFileAttributes) -> basicFileAttributes.isRegularFile()) + .map(Path::toFile).collect(Collectors.toList())); + } + + public static byte[] compress(File baseDir, List inputFiles) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + ArchiveOutputStream archiveOutputStream = new TarArchiveOutputStream(new GZIPOutputStream(out)); + createArchiveFile(archiveOutputStream, baseDir, inputFiles); + return out.toByteArray(); + } + + static void decompress(File inputFile, File outputDir) throws IOException { + log.log(LogLevel.DEBUG, "Decompressing '" + inputFile + "' into '" + outputDir + "'"); + ArchiveInputStream ais = new TarArchiveInputStream(new GZIPInputStream(new FileInputStream(inputFile))); + decompress(ais, outputDir); + ais.close(); + } + + private static void decompress(ArchiveInputStream archiveInputStream, File outputFile) throws IOException { + int entries = 0; + ArchiveEntry entry; + while ((entry = archiveInputStream.getNextEntry()) != null) { + log.log(LogLevel.DEBUG, "Unpacking " + entry.getName()); + File outFile = new File(outputFile, entry.getName()); + if (entry.isDirectory()) { + if (!(outFile.exists() && outFile.isDirectory())) { + log.log(LogLevel.DEBUG, "Creating dir: " + outFile.getAbsolutePath()); + if (!outFile.mkdirs()) { + log.log(LogLevel.WARNING, "Could not create dir " + entry.getName()); + } + } + } else { + // Create parent dir if necessary + File parent = new File(outFile.getParent()); + if (!parent.exists() && !parent.mkdirs()) { + log.log(LogLevel.WARNING, "Could not create dir " + parent.getAbsolutePath()); + } + FileOutputStream fos = new FileOutputStream(outFile); + ByteStreams.copy(archiveInputStream, fos); + fos.close(); + } + entries++; + } + if (entries == 0) { + log.log(LogLevel.WARNING, "Not able to read any entries from " + outputFile.getName()); + } + } + + private static void createArchiveFile(ArchiveOutputStream archiveOutputStream, File baseDir, List inputFiles) throws IOException { + inputFiles.forEach(file -> { + try { + writeFileToTar(archiveOutputStream, baseDir, file); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + archiveOutputStream.close(); + } + + private static void writeFileToTar(ArchiveOutputStream taos, File baseDir, File file) throws IOException { + log.log(LogLevel.DEBUG, "Adding file to tar: " + baseDir.toPath().relativize(file.toPath()).toString()); + taos.putArchiveEntry(taos.createArchiveEntry(file, baseDir.toPath().relativize(file.toPath()).toString())); + ByteStreams.copy(new FileInputStream(file), taos); + taos.closeArchiveEntry(); + } +} + diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java index fde410bc8d7..9fe5eec54ff 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java @@ -16,7 +16,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -66,8 +65,8 @@ public class FileDownloader { fileReferences.forEach(this::queueForDownload); } - public void receiveFile(FileReference fileReference, String filename, byte[] content, long xxHash) { - fileReferenceDownloader.receiveFile(fileReference, filename, content, xxHash); + void receiveFile(FileReferenceData fileReferenceData) { + fileReferenceDownloader.receiveFile(fileReferenceData); } double downloadStatus(FileReference fileReference) { @@ -85,10 +84,6 @@ public class FileDownloader { private Optional getFileFromFileSystem(FileReference fileReference, File directory) { File[] files = directory.listFiles(); if (directory.exists() && directory.isDirectory() && files != null && files.length > 0) { - if (files.length != 1) { - throw new RuntimeException("More than one file in '" + fileReference.value() + - "', expected only one, unable to proceed"); - } File file = files[0]; if (!file.exists()) { throw new RuntimeException("File with reference '" + fileReference.value() + diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java index 036c3157998..1ac3a1bd7df 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java @@ -1,4 +1,4 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.filedistribution; @@ -33,7 +33,7 @@ public class FileReceiver { private final File downloadDirectory; private final XXHash64 hasher = XXHashFactory.fastestInstance().hash64(); - public FileReceiver(Supervisor supervisor, FileReferenceDownloader downloader, File downloadDirectory) { + FileReceiver(Supervisor supervisor, FileReferenceDownloader downloader, File downloadDirectory) { this.supervisor = supervisor; this.downloader = downloader; this.downloadDirectory = downloadDirectory; @@ -68,14 +68,15 @@ public class FileReceiver { .paramDesc(4, "error-description", "Error description.") .returnDesc(0, "ret", "0 if success, 1 if crc mismatch, 2 otherwise")); // Temporary method until we have chunking - methods.add(new Method(RECEIVE_METHOD, "ssxlis", "i", handler, "receiveFile") + methods.add(new Method(RECEIVE_METHOD, "sssxlis", "i", handler, "receiveFile") .methodDesc("receive file reference content") .paramDesc(0, "file reference", "file reference to download") .paramDesc(1, "filename", "filename") - .paramDesc(2, "content", "array of bytes") - .paramDesc(3, "hash", "xx64hash of the file content") - .paramDesc(4, "errorcode", "Error code. 0 if none") - .paramDesc(5, "error-description", "Error description.") + .paramDesc(2, "type", "'file' or 'compressed'") + .paramDesc(3, "content", "array of bytes") + .paramDesc(4, "hash", "xx64hash of the file content") + .paramDesc(5, "errorcode", "Error code. 0 if none") + .paramDesc(6, "error-description", "Error description.") .returnDesc(0, "ret", "0 if success, 1 otherwise")); return methods; } @@ -84,15 +85,16 @@ public class FileReceiver { public final void receiveFile(Request req) { FileReference fileReference = new FileReference(req.parameters().get(0).asString()); String filename = req.parameters().get(1).asString(); - byte[] content = req.parameters().get(2).asData(); - long xxhash = req.parameters().get(3).asInt64(); - int errorCode = req.parameters().get(4).asInt32(); - String errorDescription = req.parameters().get(5).asString(); + String type = req.parameters().get(2).asString(); + byte[] content = req.parameters().get(3).asData(); + long xxhash = req.parameters().get(4).asInt64(); + int errorCode = req.parameters().get(5).asInt32(); + String errorDescription = req.parameters().get(6).asString(); if (errorCode == 0) { // TODO: Remove when system test works log.log(LogLevel.INFO, "Receiving file reference '" + fileReference.value() + "'"); - receiveFile(fileReference, filename, content, xxhash); + receiveFile(new FileReferenceData(fileReference, filename, FileReferenceData.Type.valueOf(type), content, xxhash)); req.returnValues().add(new Int32Value(0)); } else { log.log(LogLevel.WARNING, "Receiving file reference '" + fileReference.value() + "' failed: " + errorDescription); @@ -101,19 +103,30 @@ public class FileReceiver { } } - void receiveFile(FileReference fileReference, String filename, byte[] content, long xxHash) { - long xxHashFromContent = hasher.hash(ByteBuffer.wrap(content), 0); - if (xxHashFromContent != xxHash) - throw new RuntimeException("xxhash from content (" + xxHashFromContent + ") is not equal to xxhash in request (" + xxHash + ")"); + void receiveFile(FileReferenceData fileReferenceData) { + long xxHashFromContent = hasher.hash(ByteBuffer.wrap(fileReferenceData.content()), 0); + if (xxHashFromContent != fileReferenceData.xxhash()) + throw new RuntimeException("xxhash from content (" + xxHashFromContent + ") is not equal to xxhash in request (" + fileReferenceData.xxhash()+ ")"); - File fileReferenceDir = new File(downloadDirectory, fileReference.value()); + File fileReferenceDir = new File(downloadDirectory, fileReferenceData.fileReference().value()); + // file might be a directory (and then type is compressed) + File file = new File(fileReferenceDir, fileReferenceData.filename()); try { - File tempFile = new File(Files.createTempDirectory("downloaded").toFile(), filename); - Files.write(tempFile.toPath(), content); - Files.createDirectories(fileReferenceDir.toPath()); - File file = new File(fileReferenceDir, filename); - moveFileToDestination(tempFile, file); - downloader.completedDownloading(fileReference, file); + File tempFile = new File(Files.createTempDirectory("downloaded").toFile(), fileReferenceData.filename()); + Files.write(tempFile.toPath(), fileReferenceData.content()); + + // Unpack if necessary + if (fileReferenceData.type() == FileReferenceData.Type.compressed) { + File decompressedDir = Files.createTempDirectory("decompressed").toFile(); + log.log(LogLevel.DEBUG, "Compressed file, unpacking " + tempFile + " to " + decompressedDir); + CompressedFileReference.decompress(tempFile, decompressedDir); + moveFileToDestination(decompressedDir, fileReferenceDir); + } else { + log.log(LogLevel.DEBUG, "Uncompressed file, moving to " + file.getAbsolutePath()); + Files.createDirectories(fileReferenceDir.toPath()); + moveFileToDestination(tempFile, file); + } + downloader.completedDownloading(fileReferenceData.fileReference(), file); } catch (IOException e) { log.log(LogLevel.ERROR, "Failed writing file: " + e.getMessage(), e); throw new RuntimeException("Failed writing file: ", e); @@ -123,7 +136,7 @@ public class FileReceiver { private void moveFileToDestination(File tempFile, File destination) { try { Files.move(tempFile.toPath(), destination.toPath()); - log.log(LogLevel.INFO, "Data written to " + destination.getAbsolutePath()); + log.log(LogLevel.INFO, "File moved from " + tempFile.getAbsolutePath()+ " to " + destination.getAbsolutePath()); } catch (FileAlreadyExistsException e) { // Don't fail if it already exists (we might get the file from several config servers when retrying, servers are down etc. // so it might be written already) @@ -135,14 +148,16 @@ public class FileReceiver { } } - @SuppressWarnings({"UnusedDeclaration"}) + @SuppressWarnings({"UnusedDeclaration"}) public final void receiveFileMeta(Request req) { log.info("Received method call '" + req.methodName() + "' with parameters : " + req.parameters()); } + @SuppressWarnings({"UnusedDeclaration"}) public final void receiveFilePart(Request req) { log.info("Received method call '" + req.methodName() + "' with parameters : " + req.parameters()); } + @SuppressWarnings({"UnusedDeclaration"}) public final void receiveFileEof(Request req) { log.info("Received method call '" + req.methodName() + "' with parameters : " + req.parameters()); diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceData.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceData.java new file mode 100644 index 00000000000..6272390f5cb --- /dev/null +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceData.java @@ -0,0 +1,61 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.filedistribution; + +import com.yahoo.config.FileReference; +import net.jpountz.xxhash.XXHashFactory; + +import java.nio.ByteBuffer; + +/** + * Utility class for a file reference with data and metadata + * + * @author hmusum + */ +public class FileReferenceData { + + public enum Type {file, compressed} + + + + private final FileReference fileReference; + private final String filename; + private final Type type; + private final byte[] content; + private final long xxhash; + + public FileReferenceData(FileReference fileReference, String filename, Type type, byte[] content) { + this(fileReference, filename, type, content, XXHashFactory.fastestInstance().hash64().hash(ByteBuffer.wrap(content), 0)); + } + + public FileReferenceData(FileReference fileReference, String filename, Type type, byte[] content, long xxhash) { + this.fileReference = fileReference; + this.filename = filename; + this.type = type; + this.content = content; + this.xxhash = xxhash; + } + + public static FileReferenceData empty(FileReference fileReference, String filename) { + return new FileReferenceData(fileReference, filename, FileReferenceData.Type.file, new byte[0], 0); + } + + public FileReference fileReference() { + return fileReference; + } + + public String filename() { + return filename; + } + + public Type type() { + return type; + } + + public byte[] content() { + return content; + } + + public long xxhash() { + return xxhash; + } +} diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java index 4c9c37dd6da..b51a4b68212 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java @@ -89,8 +89,8 @@ public class FileReferenceDownloader { downloadQueue.add(fileReferenceDownload); } - void receiveFile(FileReference fileReference, String filename, byte[] content, long xxHash) { - fileReceiver.receiveFile(fileReference, filename, content, xxHash); + void receiveFile(FileReferenceData fileReferenceData) { + fileReceiver.receiveFile(fileReferenceData); } synchronized Set queuedDownloads() { diff --git a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java index 278c46dab8b..1c9e8cdb91b 100644 --- a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java +++ b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java @@ -13,15 +13,13 @@ import com.yahoo.jrt.Transport; import com.yahoo.text.Utf8; import com.yahoo.vespa.config.Connection; import com.yahoo.vespa.config.ConnectionPool; -import net.jpountz.xxhash.XXHash64; -import net.jpountz.xxhash.XXHashFactory; import org.junit.Before; import org.junit.Test; import java.io.File; import java.io.IOException; -import java.nio.ByteBuffer; import java.nio.file.Files; +import java.nio.file.Path; import java.time.Duration; import java.util.Arrays; import java.util.List; @@ -35,8 +33,6 @@ import static org.junit.Assert.fail; public class FileDownloaderTest { - private final XXHash64 hasher = XXHashFactory.fastestInstance().hash64(); - private MockConnection connection; private FileDownloader fileDownloader; private File downloadDir; @@ -102,7 +98,7 @@ public class FileDownloaderTest { // Receives fileReference, should return and make it available to caller String filename = "abc.jar"; - receiveFile(fileReference, filename, "some other content"); + receiveFile(fileReference, filename, FileReferenceData.Type.file, "some other content"); Optional downloadedFile = fileDownloader.getFile(fileReference); assertTrue(downloadedFile.isPresent()); @@ -113,6 +109,40 @@ public class FileDownloaderTest { // Verify download status when downloaded assertDownloadStatus(fileDownloader, fileReference, 100.0); } + + { + // fileReference does not exist on disk, needs to be downloaded, is compressed data + + FileReference fileReference = new FileReference("fileReferenceToDirWithManyFiles"); + File fileReferenceFullPath = fileReferenceFullPath(downloadDir, fileReference); + assertFalse(fileReferenceFullPath.getAbsolutePath(), fileDownloader.getFile(fileReference).isPresent()); + + // Verify download status + assertDownloadStatus(fileDownloader, fileReference, 0.0); + + // Receives fileReference, should return and make it available to caller + String filename = "abc.tar.gz"; + Path tempPath = Files.createTempDirectory("dir"); + File subdir = new File(tempPath.toFile(), "subdir"); + File fooFile = new File(subdir, "foo"); + IOUtils.writeFile(fooFile, "foo", false); + File barFile = new File(subdir, "bar"); + IOUtils.writeFile(barFile, "bar", false); + + File tarFile = CompressedFileReference.compress(tempPath.toFile(), Arrays.asList(fooFile, barFile), new File(tempPath.toFile(), filename)); + byte[] tarredContent = IOUtils.readFileBytes(tarFile); + receiveFile(fileReference, filename, FileReferenceData.Type.compressed, tarredContent); + Optional downloadedFile = fileDownloader.getFile(fileReference); + + assertTrue(downloadedFile.isPresent()); + File downloadedFoo = new File(fileReferenceFullPath, tempPath.relativize(fooFile.toPath()).toString()); + File downloadedBar = new File(fileReferenceFullPath, tempPath.relativize(barFile.toPath()).toString()); + assertEquals("foo", IOUtils.readFile(downloadedFoo)); + assertEquals("bar", IOUtils.readFile(downloadedBar)); + + // Verify download status when downloaded + assertDownloadStatus(fileDownloader, fileReference, 100.0); + } } @Test @@ -133,7 +163,7 @@ public class FileDownloaderTest { // Receives fileReference, should return and make it available to caller String filename = "abc.jar"; - receiveFile(fileReference, filename, "some other content"); + receiveFile(fileReference, filename, FileReferenceData.Type.file, "some other content"); Optional downloadedFile = fileDownloader.getFile(fileReference); assertTrue(downloadedFile.isPresent()); @@ -168,7 +198,7 @@ public class FileDownloaderTest { public void receiveFile() throws IOException { FileReference foo = new FileReference("foo"); String filename = "foo.jar"; - receiveFile(foo, filename, "content"); + receiveFile(foo, filename, FileReferenceData.Type.file, "content"); File downloadedFile = new File(fileReferenceFullPath(downloadDir, foo), filename); assertEquals("content", IOUtils.readFile(downloadedFile)); } @@ -187,10 +217,12 @@ public class FileDownloaderTest { assertEquals(expectedDownloadStatus, downloadStatus, 0.0001); } - private void receiveFile(FileReference fileReference, String filename, String content) { - byte[] contentBytes = Utf8.toBytes(content); - long xxHashFromContent = hasher.hash(ByteBuffer.wrap(contentBytes), 0); - fileDownloader.receiveFile(fileReference, filename, contentBytes, xxHashFromContent); + private void receiveFile(FileReference fileReference, String filename, FileReferenceData.Type type, String content) { + receiveFile(fileReference, filename, type, Utf8.toBytes(content)); + } + + private void receiveFile(FileReference fileReference, String filename, FileReferenceData.Type type, byte[] content) { + fileDownloader.receiveFile(new FileReferenceData(fileReference, filename, type, content)); } private static class MockConnection implements ConnectionPool, com.yahoo.vespa.config.Connection { -- cgit v1.2.3