aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHarald Musum <musum@oath.com>2017-12-07 13:18:16 +0100
committerHarald Musum <musum@oath.com>2017-12-07 13:18:16 +0100
commit423a955e4a975d4f1dde550df2cea018882d4035 (patch)
treed73dcbd41b924c20e467f5983f42cd397567074e
parent56deb93f655849712b4cc17fc69df6331e0253a3 (diff)
Handle that a file reference is a directory with many files
Compress on the fly if asked for such a file reference
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDirectory.java29
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java34
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java20
-rw-r--r--configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileDirectoryTest.java69
-rw-r--r--configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java31
-rw-r--r--filedistribution/pom.xml4
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/CompressedFileReference.java111
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java9
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java65
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceData.java61
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java4
-rw-r--r--filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java56
12 files changed, 403 insertions, 90 deletions
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDirectory.java b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDirectory.java
index d8a560c6159..e991341b616 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDirectory.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDirectory.java
@@ -35,7 +35,7 @@ public class FileDirectory {
try {
ensureRootExist();
} catch (IllegalArgumentException e) {
- log.warning("Failed creating directory in constructor, will retry on demand : " + e.toString());
+ log.log(LogLevel.WARNING, "Failed creating directory in constructor, will retry on demand : " + e.toString());
}
}
@@ -70,12 +70,8 @@ public class FileDirectory {
throw new IllegalArgumentException("File reference '" + reference.toString() + "' with absolute path '" + dir.getAbsolutePath() + "' is not a directory.");
}
File [] files = dir.listFiles(new Filter());
- if (files.length != 1) {
- StringBuilder msg = new StringBuilder();
- for (File f: files) {
- msg.append(f.getName()).append("\n");
- }
- throw new IllegalArgumentException("File reference '" + reference.toString() + "' with absolute path '" + dir.getAbsolutePath() + " does not contain exactly one file, but [" + msg.toString() + "]");
+ if (files == null || files.length == 0) {
+ throw new IllegalArgumentException("File reference '" + reference.toString() + "' with absolute path '" + dir.getAbsolutePath() + " does not contain any files");
}
return files[0];
}
@@ -96,25 +92,28 @@ public class FileDirectory {
}
}
- public FileReference addFile(File source, FileReference reference) {
+ FileReference addFile(File source, FileReference reference) {
ensureRootExist();
try {
logfileInfo(source);
File destinationDir = new File(root, reference.value());
+ Path tempDestinationDir = Files.createTempDirectory(root.toPath(), "writing");
+ File destination = new File(tempDestinationDir.toFile(), source.getName());
if (!destinationDir.exists()) {
destinationDir.mkdir();
- Path tempDestinationDir = Files.createTempDirectory(root.toPath(), "writing");
- File destination = new File(tempDestinationDir.toFile(), source.getName());
- if (source.isDirectory())
- IOUtils.copyDirectory(source, destination);
- else
+ log.log(LogLevel.DEBUG, "file reference ' " + reference.value() + "', source: " + source.getAbsolutePath() );
+ if (source.isDirectory()) {
+ log.log(LogLevel.DEBUG, "Copying source " + source.getAbsolutePath() + " to " + destination.getAbsolutePath());
+ IOUtils.copyDirectory(source, destination, -1);
+ } else
copyFile(source, destination);
if (!destinationDir.exists()) {
+ log.log(LogLevel.DEBUG, "Moving from " + tempDestinationDir + " to " + destinationDir.getAbsolutePath());
if ( ! tempDestinationDir.toFile().renameTo(destinationDir)) {
- log.warning("Failed moving '" + tempDestinationDir.toFile().getAbsolutePath() + "' to '" + destination.getAbsolutePath() + "'.");
+ log.log(LogLevel.WARNING, "Failed moving '" + tempDestinationDir.toFile().getAbsolutePath() + "' to '" + destination.getAbsolutePath() + "'.");
}
} else {
- IOUtils.copyDirectory(tempDestinationDir.toFile(), destinationDir, 1);
+ IOUtils.copyDirectory(tempDestinationDir.toFile(), destinationDir, -1);
}
IOUtils.recursiveDeleteDir(tempDestinationDir.toFile());
}
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java
index 9316a9a5c8e..958f26632ef 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java
@@ -14,7 +14,9 @@ import com.yahoo.vespa.config.Connection;
import com.yahoo.vespa.config.ConnectionPool;
import com.yahoo.vespa.config.JRTConnectionPool;
import com.yahoo.vespa.config.server.ConfigServerSpec;
+import com.yahoo.vespa.filedistribution.CompressedFileReference;
import com.yahoo.vespa.filedistribution.FileDownloader;
+import com.yahoo.vespa.filedistribution.FileReferenceData;
import java.io.File;
import java.io.IOException;
@@ -26,6 +28,7 @@ import java.util.stream.Collectors;
public class FileServer {
private static final Logger log = Logger.getLogger(FileServer.class.getName());
+
private final FileDirectory root;
private final ExecutorService executor;
private final FileDownloader downloader;
@@ -43,7 +46,7 @@ public class FileServer {
}
public interface Receiver {
- void receive(FileReference reference, String filename, byte [] content, ReplayStatus status);
+ void receive(FileReferenceData fileData, ReplayStatus status);
}
@Inject
@@ -87,22 +90,39 @@ public class FileServer {
File file = root.getFile(reference);
// TODO remove once verified in system tests.
log.info("Start serving reference '" + reference.value() + "' with file '" + file.getAbsolutePath() + "'");
- byte [] blob = new byte [0];
boolean success = false;
String errorDescription = "OK";
+ FileReferenceData fileData = FileReferenceData.empty(reference, file.getName());
try {
- blob = IOUtils.readFileBytes(file);
+ fileData = readFileReferenceData(reference);
success = true;
} catch (IOException e) {
- errorDescription = "For file reference '" + reference.value() + "' I failed reading file '" + file.getAbsolutePath() + "'";
- log.warning(errorDescription + "for sending to '" + target.toString() + "'. " + e.toString());
+ errorDescription = "For file reference '" + reference.value() + "': failed reading file '" + file.getAbsolutePath() + "'";
+ log.warning(errorDescription + " for sending to '" + target.toString() + "'. " + e.toString());
}
- target.receive(reference, file.getName(), blob,
- new ReplayStatus(success ? 0 : 1, success ? "OK" : errorDescription));
+
+ target.receive(fileData, new ReplayStatus(success ? 0 : 1, success ? "OK" : errorDescription));
// TODO remove once verified in system tests.
log.info("Done serving reference '" + reference.toString() + "' with file '" + file.getAbsolutePath() + "'");
}
+
+ private FileReferenceData readFileReferenceData(FileReference reference) throws IOException {
+ File file = root.getFile(reference);
+
+ byte[] blob;
+ FileReferenceData.Type type;
+ if (file.isDirectory()) {
+ type = FileReferenceData.Type.compressed;
+ blob = CompressedFileReference.compress(file.getParentFile());
+ } else {
+ type = FileReferenceData.Type.file;
+ blob = IOUtils.readFileBytes(file);
+ }
+
+ return new FileReferenceData(reference, file.getName(), type, blob);
+ }
+
public void download(FileReference fileReference) {
downloader.getFile(fileReference);
}
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java
index d17cdf722ea..5c50fbfc31b 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java
@@ -41,10 +41,8 @@ import com.yahoo.vespa.config.server.monitoring.MetricUpdaterFactory;
import com.yahoo.vespa.config.server.tenant.TenantHandlerProvider;
import com.yahoo.vespa.config.server.tenant.TenantListener;
import com.yahoo.vespa.config.server.tenant.Tenants;
-import net.jpountz.xxhash.XXHash64;
-import net.jpountz.xxhash.XXHashFactory;
+import com.yahoo.vespa.filedistribution.FileReferenceData;
-import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -438,19 +436,19 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener {
}
@Override
- public void receive(FileReference reference, String filename, byte [] content, FileServer.ReplayStatus status) {
- XXHash64 hasher = XXHashFactory.fastestInstance().hash64();
+ public void receive(FileReferenceData fileData, FileServer.ReplayStatus status) {
Request fileBlob = new Request("filedistribution.receiveFile");
- fileBlob.parameters().add(new StringValue(reference.value()));
- fileBlob.parameters().add(new StringValue(filename));
- fileBlob.parameters().add(new DataValue(content));
- fileBlob.parameters().add(new Int64Value(hasher.hash(ByteBuffer.wrap(content), 0)));
+ fileBlob.parameters().add(new StringValue(fileData.fileReference().value()));
+ fileBlob.parameters().add(new StringValue(fileData.filename()));
+ fileBlob.parameters().add(new StringValue(fileData.type().name()));
+ fileBlob.parameters().add(new DataValue(fileData.content()));
+ fileBlob.parameters().add(new Int64Value(fileData.xxhash()));
fileBlob.parameters().add(new Int32Value(status.getCode()));
fileBlob.parameters().add(new StringValue(status.getDescription()));
target.invokeSync(fileBlob, 600);
if (fileBlob.isError()) {
- log.warning("Failed delivering reference '" + reference.value() + "' with file '" + filename + "' to " +
- target.toString() + " with error : '" + fileBlob.errorMessage() + "'.");
+ log.warning("Failed delivering reference '" + fileData.fileReference().value() + "' with file '" + fileData.filename() + "' to " +
+ target.toString() + " with error: '" + fileBlob.errorMessage() + "'.");
}
}
}
diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileDirectoryTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileDirectoryTest.java
new file mode 100644
index 00000000000..ad807f9527f
--- /dev/null
+++ b/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileDirectoryTest.java
@@ -0,0 +1,69 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+package com.yahoo.vespa.config.server.filedistribution;
+
+import com.yahoo.config.FileReference;
+import com.yahoo.io.IOUtils;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.junit.Assert.assertTrue;
+
+public class FileDirectoryTest {
+
+ private FileDirectory fileDirectory;
+
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ @Before
+ public void setup() {
+ fileDirectory = new FileDirectory(temporaryFolder.getRoot());
+ }
+
+ @Test
+ public void requireThatFileReferenceWithFilesWorks() throws IOException {
+ FileReference foo = createFile("foo");
+ FileReference bar = createFile("bar");
+
+ assertTrue(fileDirectory.getFile(foo).exists());
+ assertTrue(fileDirectory.getFile(bar).exists());
+ }
+
+
+ @Test
+ public void requireThatFileReferenceWithSubDirectoriesWorks() throws IOException {
+ FileDirectory fileDirectory = new FileDirectory(temporaryFolder.getRoot());
+
+ FileReference foo = createFileInSubDir("subdir", "foo");
+ FileReference bar = createFileInSubDir("subdir", "bar");
+
+ assertTrue(fileDirectory.getFile(foo).exists());
+ assertTrue(fileDirectory.getFile(bar).exists());
+ }
+
+ // Content in created file is equal to the filename string
+ private FileReference createFile(String filename) throws IOException {
+ File file = temporaryFolder.newFile(filename);
+ IOUtils.writeFile(file, filename, false);
+ return fileDirectory.addFile(file);
+ }
+
+ private FileReference createFileInSubDir(String subdirName, String filename) throws IOException {
+ File subDirectory = new File(temporaryFolder.getRoot(), subdirName);
+ if (!subDirectory.exists())
+ subDirectory.mkdirs();
+ File file = new File(subDirectory, filename);
+ IOUtils.writeFile(file, filename, false);
+ return fileDirectory.addFile(file);
+ }
+
+
+}
+
+
diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java
index 09260987ac0..5fcaee6e590 100644
--- a/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java
+++ b/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java
@@ -2,9 +2,9 @@
package com.yahoo.vespa.config.server.filedistribution;
import com.yahoo.cloud.config.ConfigserverConfig;
-import com.yahoo.config.FileReference;
import com.yahoo.io.IOUtils;
import com.yahoo.net.HostName;
+import com.yahoo.vespa.filedistribution.FileReferenceData;
import org.junit.Test;
import java.io.File;
@@ -35,7 +35,7 @@ public class FileServerTest {
}
@Test
- public void requireThatExistingFileCanbeFound() throws IOException {
+ public void requireThatExistingFileCanBeFound() throws IOException {
createCleanDir("123");
IOUtils.writeFile("123/f1", "test", true);
assertTrue(fs.hasFile("123"));
@@ -50,15 +50,13 @@ public class FileServerTest {
cleanup();
}
- private static class FileReceiver implements FileServer.Receiver {
- CompletableFuture<byte []> content;
- FileReceiver(CompletableFuture<byte []> content) {
- this.content = content;
- }
- @Override
- public void receive(FileReference reference, String filename, byte[] content, FileServer.ReplayStatus status) {
- this.content.complete(content);
- }
+ @Test
+ public void requireThatFileReferenceWithDirectoryCanBeFound() throws IOException {
+ createCleanDir("124/subdir");
+ IOUtils.writeFile("124/subdir/f1", "test", false);
+ IOUtils.writeFile("124/subdir/f2", "test", false);
+ assertTrue(fs.hasFile("124/subdir"));
+ cleanup();
}
@Test
@@ -98,6 +96,17 @@ public class FileServerTest {
assertEquals(1, fileServer.downloader().fileReferenceDownloader().connectionPool().getSize());
}
+ private static class FileReceiver implements FileServer.Receiver {
+ CompletableFuture<byte []> content;
+ FileReceiver(CompletableFuture<byte []> content) {
+ this.content = content;
+ }
+ @Override
+ public void receive(FileReferenceData fileData, FileServer.ReplayStatus status) {
+ this.content.complete(fileData.content());
+ }
+ }
+
private void cleanup() {
created.forEach((file) -> IOUtils.recursiveDeleteDir(file));
created.clear();
diff --git a/filedistribution/pom.xml b/filedistribution/pom.xml
index b0bdaff518f..d9699b700d0 100644
--- a/filedistribution/pom.xml
+++ b/filedistribution/pom.xml
@@ -54,6 +54,10 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-compress</artifactId>
+ </dependency>
</dependencies>
<build>
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/CompressedFileReference.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/CompressedFileReference.java
new file mode 100644
index 00000000000..759a859253e
--- /dev/null
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/CompressedFileReference.java
@@ -0,0 +1,111 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.filedistribution;
+
+import com.google.common.io.ByteStreams;
+import com.yahoo.log.LogLevel;
+import org.apache.commons.compress.archivers.ArchiveEntry;
+import org.apache.commons.compress.archivers.ArchiveInputStream;
+import org.apache.commons.compress.archivers.ArchiveOutputStream;
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+/**
+ * Utility class for compressing and decompressing files used in a file reference
+ *
+ * @author hmusum
+ */
+public class CompressedFileReference {
+
+ private static final Logger log = Logger.getLogger(CompressedFileReference.class.getName());
+ private static final int recurseDepth = 100;
+
+ public static File compress(File baseDir, List<File> inputFiles, File outputFile) throws IOException {
+ ArchiveOutputStream archiveOutputStream = new TarArchiveOutputStream(new GZIPOutputStream(new FileOutputStream(outputFile)));
+ createArchiveFile(archiveOutputStream, baseDir, inputFiles);
+ return outputFile;
+ }
+
+ public static byte[] compress(File directory) throws IOException {
+ return compress(directory, Files.find(Paths.get(directory.getAbsolutePath()),
+ recurseDepth,
+ (p, basicFileAttributes) -> basicFileAttributes.isRegularFile())
+ .map(Path::toFile).collect(Collectors.toList()));
+ }
+
+ public static byte[] compress(File baseDir, List<File> inputFiles) throws IOException {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ ArchiveOutputStream archiveOutputStream = new TarArchiveOutputStream(new GZIPOutputStream(out));
+ createArchiveFile(archiveOutputStream, baseDir, inputFiles);
+ return out.toByteArray();
+ }
+
+ static void decompress(File inputFile, File outputDir) throws IOException {
+ log.log(LogLevel.DEBUG, "Decompressing '" + inputFile + "' into '" + outputDir + "'");
+ ArchiveInputStream ais = new TarArchiveInputStream(new GZIPInputStream(new FileInputStream(inputFile)));
+ decompress(ais, outputDir);
+ ais.close();
+ }
+
+ private static void decompress(ArchiveInputStream archiveInputStream, File outputFile) throws IOException {
+ int entries = 0;
+ ArchiveEntry entry;
+ while ((entry = archiveInputStream.getNextEntry()) != null) {
+ log.log(LogLevel.DEBUG, "Unpacking " + entry.getName());
+ File outFile = new File(outputFile, entry.getName());
+ if (entry.isDirectory()) {
+ if (!(outFile.exists() && outFile.isDirectory())) {
+ log.log(LogLevel.DEBUG, "Creating dir: " + outFile.getAbsolutePath());
+ if (!outFile.mkdirs()) {
+ log.log(LogLevel.WARNING, "Could not create dir " + entry.getName());
+ }
+ }
+ } else {
+ // Create parent dir if necessary
+ File parent = new File(outFile.getParent());
+ if (!parent.exists() && !parent.mkdirs()) {
+ log.log(LogLevel.WARNING, "Could not create dir " + parent.getAbsolutePath());
+ }
+ FileOutputStream fos = new FileOutputStream(outFile);
+ ByteStreams.copy(archiveInputStream, fos);
+ fos.close();
+ }
+ entries++;
+ }
+ if (entries == 0) {
+ log.log(LogLevel.WARNING, "Not able to read any entries from " + outputFile.getName());
+ }
+ }
+
+ private static void createArchiveFile(ArchiveOutputStream archiveOutputStream, File baseDir, List<File> inputFiles) throws IOException {
+ inputFiles.forEach(file -> {
+ try {
+ writeFileToTar(archiveOutputStream, baseDir, file);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ archiveOutputStream.close();
+ }
+
+ private static void writeFileToTar(ArchiveOutputStream taos, File baseDir, File file) throws IOException {
+ log.log(LogLevel.DEBUG, "Adding file to tar: " + baseDir.toPath().relativize(file.toPath()).toString());
+ taos.putArchiveEntry(taos.createArchiveEntry(file, baseDir.toPath().relativize(file.toPath()).toString()));
+ ByteStreams.copy(new FileInputStream(file), taos);
+ taos.closeArchiveEntry();
+ }
+}
+
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java
index fde410bc8d7..9fe5eec54ff 100644
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java
@@ -16,7 +16,6 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
-import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -66,8 +65,8 @@ public class FileDownloader {
fileReferences.forEach(this::queueForDownload);
}
- public void receiveFile(FileReference fileReference, String filename, byte[] content, long xxHash) {
- fileReferenceDownloader.receiveFile(fileReference, filename, content, xxHash);
+ void receiveFile(FileReferenceData fileReferenceData) {
+ fileReferenceDownloader.receiveFile(fileReferenceData);
}
double downloadStatus(FileReference fileReference) {
@@ -85,10 +84,6 @@ public class FileDownloader {
private Optional<File> getFileFromFileSystem(FileReference fileReference, File directory) {
File[] files = directory.listFiles();
if (directory.exists() && directory.isDirectory() && files != null && files.length > 0) {
- if (files.length != 1) {
- throw new RuntimeException("More than one file in '" + fileReference.value() +
- "', expected only one, unable to proceed");
- }
File file = files[0];
if (!file.exists()) {
throw new RuntimeException("File with reference '" + fileReference.value() +
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java
index 036c3157998..1ac3a1bd7df 100644
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java
@@ -1,4 +1,4 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.filedistribution;
@@ -33,7 +33,7 @@ public class FileReceiver {
private final File downloadDirectory;
private final XXHash64 hasher = XXHashFactory.fastestInstance().hash64();
- public FileReceiver(Supervisor supervisor, FileReferenceDownloader downloader, File downloadDirectory) {
+ FileReceiver(Supervisor supervisor, FileReferenceDownloader downloader, File downloadDirectory) {
this.supervisor = supervisor;
this.downloader = downloader;
this.downloadDirectory = downloadDirectory;
@@ -68,14 +68,15 @@ public class FileReceiver {
.paramDesc(4, "error-description", "Error description.")
.returnDesc(0, "ret", "0 if success, 1 if crc mismatch, 2 otherwise"));
// Temporary method until we have chunking
- methods.add(new Method(RECEIVE_METHOD, "ssxlis", "i", handler, "receiveFile")
+ methods.add(new Method(RECEIVE_METHOD, "sssxlis", "i", handler, "receiveFile")
.methodDesc("receive file reference content")
.paramDesc(0, "file reference", "file reference to download")
.paramDesc(1, "filename", "filename")
- .paramDesc(2, "content", "array of bytes")
- .paramDesc(3, "hash", "xx64hash of the file content")
- .paramDesc(4, "errorcode", "Error code. 0 if none")
- .paramDesc(5, "error-description", "Error description.")
+ .paramDesc(2, "type", "'file' or 'compressed'")
+ .paramDesc(3, "content", "array of bytes")
+ .paramDesc(4, "hash", "xx64hash of the file content")
+ .paramDesc(5, "errorcode", "Error code. 0 if none")
+ .paramDesc(6, "error-description", "Error description.")
.returnDesc(0, "ret", "0 if success, 1 otherwise"));
return methods;
}
@@ -84,15 +85,16 @@ public class FileReceiver {
public final void receiveFile(Request req) {
FileReference fileReference = new FileReference(req.parameters().get(0).asString());
String filename = req.parameters().get(1).asString();
- byte[] content = req.parameters().get(2).asData();
- long xxhash = req.parameters().get(3).asInt64();
- int errorCode = req.parameters().get(4).asInt32();
- String errorDescription = req.parameters().get(5).asString();
+ String type = req.parameters().get(2).asString();
+ byte[] content = req.parameters().get(3).asData();
+ long xxhash = req.parameters().get(4).asInt64();
+ int errorCode = req.parameters().get(5).asInt32();
+ String errorDescription = req.parameters().get(6).asString();
if (errorCode == 0) {
// TODO: Remove when system test works
log.log(LogLevel.INFO, "Receiving file reference '" + fileReference.value() + "'");
- receiveFile(fileReference, filename, content, xxhash);
+ receiveFile(new FileReferenceData(fileReference, filename, FileReferenceData.Type.valueOf(type), content, xxhash));
req.returnValues().add(new Int32Value(0));
} else {
log.log(LogLevel.WARNING, "Receiving file reference '" + fileReference.value() + "' failed: " + errorDescription);
@@ -101,19 +103,30 @@ public class FileReceiver {
}
}
- void receiveFile(FileReference fileReference, String filename, byte[] content, long xxHash) {
- long xxHashFromContent = hasher.hash(ByteBuffer.wrap(content), 0);
- if (xxHashFromContent != xxHash)
- throw new RuntimeException("xxhash from content (" + xxHashFromContent + ") is not equal to xxhash in request (" + xxHash + ")");
+ void receiveFile(FileReferenceData fileReferenceData) {
+ long xxHashFromContent = hasher.hash(ByteBuffer.wrap(fileReferenceData.content()), 0);
+ if (xxHashFromContent != fileReferenceData.xxhash())
+ throw new RuntimeException("xxhash from content (" + xxHashFromContent + ") is not equal to xxhash in request (" + fileReferenceData.xxhash()+ ")");
- File fileReferenceDir = new File(downloadDirectory, fileReference.value());
+ File fileReferenceDir = new File(downloadDirectory, fileReferenceData.fileReference().value());
+ // file might be a directory (and then type is compressed)
+ File file = new File(fileReferenceDir, fileReferenceData.filename());
try {
- File tempFile = new File(Files.createTempDirectory("downloaded").toFile(), filename);
- Files.write(tempFile.toPath(), content);
- Files.createDirectories(fileReferenceDir.toPath());
- File file = new File(fileReferenceDir, filename);
- moveFileToDestination(tempFile, file);
- downloader.completedDownloading(fileReference, file);
+ File tempFile = new File(Files.createTempDirectory("downloaded").toFile(), fileReferenceData.filename());
+ Files.write(tempFile.toPath(), fileReferenceData.content());
+
+ // Unpack if necessary
+ if (fileReferenceData.type() == FileReferenceData.Type.compressed) {
+ File decompressedDir = Files.createTempDirectory("decompressed").toFile();
+ log.log(LogLevel.DEBUG, "Compressed file, unpacking " + tempFile + " to " + decompressedDir);
+ CompressedFileReference.decompress(tempFile, decompressedDir);
+ moveFileToDestination(decompressedDir, fileReferenceDir);
+ } else {
+ log.log(LogLevel.DEBUG, "Uncompressed file, moving to " + file.getAbsolutePath());
+ Files.createDirectories(fileReferenceDir.toPath());
+ moveFileToDestination(tempFile, file);
+ }
+ downloader.completedDownloading(fileReferenceData.fileReference(), file);
} catch (IOException e) {
log.log(LogLevel.ERROR, "Failed writing file: " + e.getMessage(), e);
throw new RuntimeException("Failed writing file: ", e);
@@ -123,7 +136,7 @@ public class FileReceiver {
private void moveFileToDestination(File tempFile, File destination) {
try {
Files.move(tempFile.toPath(), destination.toPath());
- log.log(LogLevel.INFO, "Data written to " + destination.getAbsolutePath());
+ log.log(LogLevel.INFO, "File moved from " + tempFile.getAbsolutePath()+ " to " + destination.getAbsolutePath());
} catch (FileAlreadyExistsException e) {
// Don't fail if it already exists (we might get the file from several config servers when retrying, servers are down etc.
// so it might be written already)
@@ -135,14 +148,16 @@ public class FileReceiver {
}
}
- @SuppressWarnings({"UnusedDeclaration"})
+ @SuppressWarnings({"UnusedDeclaration"})
public final void receiveFileMeta(Request req) {
log.info("Received method call '" + req.methodName() + "' with parameters : " + req.parameters());
}
+
@SuppressWarnings({"UnusedDeclaration"})
public final void receiveFilePart(Request req) {
log.info("Received method call '" + req.methodName() + "' with parameters : " + req.parameters());
}
+
@SuppressWarnings({"UnusedDeclaration"})
public final void receiveFileEof(Request req) {
log.info("Received method call '" + req.methodName() + "' with parameters : " + req.parameters());
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceData.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceData.java
new file mode 100644
index 00000000000..6272390f5cb
--- /dev/null
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceData.java
@@ -0,0 +1,61 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.filedistribution;
+
+import com.yahoo.config.FileReference;
+import net.jpountz.xxhash.XXHashFactory;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Utility class for a file reference with data and metadata
+ *
+ * @author hmusum
+ */
+public class FileReferenceData {
+
+ public enum Type {file, compressed}
+
+
+
+ private final FileReference fileReference;
+ private final String filename;
+ private final Type type;
+ private final byte[] content;
+ private final long xxhash;
+
+ public FileReferenceData(FileReference fileReference, String filename, Type type, byte[] content) {
+ this(fileReference, filename, type, content, XXHashFactory.fastestInstance().hash64().hash(ByteBuffer.wrap(content), 0));
+ }
+
+ public FileReferenceData(FileReference fileReference, String filename, Type type, byte[] content, long xxhash) {
+ this.fileReference = fileReference;
+ this.filename = filename;
+ this.type = type;
+ this.content = content;
+ this.xxhash = xxhash;
+ }
+
+ public static FileReferenceData empty(FileReference fileReference, String filename) {
+ return new FileReferenceData(fileReference, filename, FileReferenceData.Type.file, new byte[0], 0);
+ }
+
+ public FileReference fileReference() {
+ return fileReference;
+ }
+
+ public String filename() {
+ return filename;
+ }
+
+ public Type type() {
+ return type;
+ }
+
+ public byte[] content() {
+ return content;
+ }
+
+ public long xxhash() {
+ return xxhash;
+ }
+}
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java
index 4c9c37dd6da..b51a4b68212 100644
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java
@@ -89,8 +89,8 @@ public class FileReferenceDownloader {
downloadQueue.add(fileReferenceDownload);
}
- void receiveFile(FileReference fileReference, String filename, byte[] content, long xxHash) {
- fileReceiver.receiveFile(fileReference, filename, content, xxHash);
+ void receiveFile(FileReferenceData fileReferenceData) {
+ fileReceiver.receiveFile(fileReferenceData);
}
synchronized Set<FileReference> queuedDownloads() {
diff --git a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java
index 278c46dab8b..1c9e8cdb91b 100644
--- a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java
+++ b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java
@@ -13,15 +13,13 @@ import com.yahoo.jrt.Transport;
import com.yahoo.text.Utf8;
import com.yahoo.vespa.config.Connection;
import com.yahoo.vespa.config.ConnectionPool;
-import net.jpountz.xxhash.XXHash64;
-import net.jpountz.xxhash.XXHashFactory;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.nio.file.Files;
+import java.nio.file.Path;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
@@ -35,8 +33,6 @@ import static org.junit.Assert.fail;
public class FileDownloaderTest {
- private final XXHash64 hasher = XXHashFactory.fastestInstance().hash64();
-
private MockConnection connection;
private FileDownloader fileDownloader;
private File downloadDir;
@@ -102,7 +98,7 @@ public class FileDownloaderTest {
// Receives fileReference, should return and make it available to caller
String filename = "abc.jar";
- receiveFile(fileReference, filename, "some other content");
+ receiveFile(fileReference, filename, FileReferenceData.Type.file, "some other content");
Optional<File> downloadedFile = fileDownloader.getFile(fileReference);
assertTrue(downloadedFile.isPresent());
@@ -113,6 +109,40 @@ public class FileDownloaderTest {
// Verify download status when downloaded
assertDownloadStatus(fileDownloader, fileReference, 100.0);
}
+
+ {
+ // fileReference does not exist on disk, needs to be downloaded, is compressed data
+
+ FileReference fileReference = new FileReference("fileReferenceToDirWithManyFiles");
+ File fileReferenceFullPath = fileReferenceFullPath(downloadDir, fileReference);
+ assertFalse(fileReferenceFullPath.getAbsolutePath(), fileDownloader.getFile(fileReference).isPresent());
+
+ // Verify download status
+ assertDownloadStatus(fileDownloader, fileReference, 0.0);
+
+ // Receives fileReference, should return and make it available to caller
+ String filename = "abc.tar.gz";
+ Path tempPath = Files.createTempDirectory("dir");
+ File subdir = new File(tempPath.toFile(), "subdir");
+ File fooFile = new File(subdir, "foo");
+ IOUtils.writeFile(fooFile, "foo", false);
+ File barFile = new File(subdir, "bar");
+ IOUtils.writeFile(barFile, "bar", false);
+
+ File tarFile = CompressedFileReference.compress(tempPath.toFile(), Arrays.asList(fooFile, barFile), new File(tempPath.toFile(), filename));
+ byte[] tarredContent = IOUtils.readFileBytes(tarFile);
+ receiveFile(fileReference, filename, FileReferenceData.Type.compressed, tarredContent);
+ Optional<File> downloadedFile = fileDownloader.getFile(fileReference);
+
+ assertTrue(downloadedFile.isPresent());
+ File downloadedFoo = new File(fileReferenceFullPath, tempPath.relativize(fooFile.toPath()).toString());
+ File downloadedBar = new File(fileReferenceFullPath, tempPath.relativize(barFile.toPath()).toString());
+ assertEquals("foo", IOUtils.readFile(downloadedFoo));
+ assertEquals("bar", IOUtils.readFile(downloadedBar));
+
+ // Verify download status when downloaded
+ assertDownloadStatus(fileDownloader, fileReference, 100.0);
+ }
}
@Test
@@ -133,7 +163,7 @@ public class FileDownloaderTest {
// Receives fileReference, should return and make it available to caller
String filename = "abc.jar";
- receiveFile(fileReference, filename, "some other content");
+ receiveFile(fileReference, filename, FileReferenceData.Type.file, "some other content");
Optional<File> downloadedFile = fileDownloader.getFile(fileReference);
assertTrue(downloadedFile.isPresent());
@@ -168,7 +198,7 @@ public class FileDownloaderTest {
public void receiveFile() throws IOException {
FileReference foo = new FileReference("foo");
String filename = "foo.jar";
- receiveFile(foo, filename, "content");
+ receiveFile(foo, filename, FileReferenceData.Type.file, "content");
File downloadedFile = new File(fileReferenceFullPath(downloadDir, foo), filename);
assertEquals("content", IOUtils.readFile(downloadedFile));
}
@@ -187,10 +217,12 @@ public class FileDownloaderTest {
assertEquals(expectedDownloadStatus, downloadStatus, 0.0001);
}
- private void receiveFile(FileReference fileReference, String filename, String content) {
- byte[] contentBytes = Utf8.toBytes(content);
- long xxHashFromContent = hasher.hash(ByteBuffer.wrap(contentBytes), 0);
- fileDownloader.receiveFile(fileReference, filename, contentBytes, xxHashFromContent);
+ private void receiveFile(FileReference fileReference, String filename, FileReferenceData.Type type, String content) {
+ receiveFile(fileReference, filename, type, Utf8.toBytes(content));
+ }
+
+ private void receiveFile(FileReference fileReference, String filename, FileReferenceData.Type type, byte[] content) {
+ fileDownloader.receiveFile(new FileReferenceData(fileReference, filename, type, content));
}
private static class MockConnection implements ConnectionPool, com.yahoo.vespa.config.Connection {