summaryrefslogtreecommitdiffstats
path: root/filedistribution
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-12-08 14:23:09 +0100
committerHenning Baldersheim <balder@yahoo-inc.com>2017-12-08 14:23:09 +0100
commit61777aa4809fa96a20c0b0c2a05b4a4fb38c2b26 (patch)
tree391661a6bfba927873f28030f82f3bf453906d02 /filedistribution
parent21f36060bcf0577ee450d4cce56edf5941d50fa5 (diff)
Add the receiving part of the streaming transfer.
Diffstat (limited to 'filedistribution')
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java134
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceData.java2
-rw-r--r--filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileReceiverTest.java59
3 files changed, 153 insertions, 42 deletions
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java
index 7759a79cce9..d5543fa6823 100644
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java
@@ -3,12 +3,12 @@
package com.yahoo.vespa.filedistribution;
import com.yahoo.config.FileReference;
-import com.yahoo.jrt.DataValue;
import com.yahoo.jrt.Int32Value;
import com.yahoo.jrt.Method;
import com.yahoo.jrt.Request;
import com.yahoo.jrt.Supervisor;
import com.yahoo.log.LogLevel;
+import net.jpountz.xxhash.StreamingXXHash64;
import net.jpountz.xxhash.XXHash64;
import net.jpountz.xxhash.XXHashFactory;
@@ -17,8 +17,8 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
-import java.nio.file.OpenOption;
import java.nio.file.StandardOpenOption;
+import java.nio.file.attribute.PosixFilePermissions;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -41,39 +41,87 @@ public class FileReceiver {
private final AtomicInteger nextSessionId = new AtomicInteger(1);
private final Map<Integer, Session> sessions = new HashMap();
- private final class Session {
+ final static class Session {
+ private final StreamingXXHash64 hasher;
private final int sessionId;
private final FileReference reference;
+ private final FileReferenceData.Type fileType;
private final String fileName;
private final long fileSize;
private long currentFileSize;
private long currentPartId;
- private final File file;
- Session(int sessionId, FileReference reference, String fileName, long fileSize) throws IOException{
+ private long currentHash;
+ private final File fileReferenceDir;
+ private final File inprogressFile;
+
+ Session(File downloadDirectory, int sessionId, FileReference reference,
+ FileReferenceData.Type fileType, String fileName, long fileSize)
+ {
+ this.hasher = XXHashFactory.fastestInstance().newStreamingHash64(0);
this.sessionId = sessionId;
this.reference = reference;
+ this.fileType = fileType;
this.fileName = fileName;
this.fileSize = fileSize;
currentFileSize = 0;
currentPartId = 0;
- File fileReferenceDir = new File(downloadDirectory, reference.value());
- Files.createDirectories(fileReferenceDir.toPath());
- file = new File(fileReferenceDir, fileName);
- if ( ! file.createNewFile()) {
- throw new IllegalStateException("File " + file + "already exists.");
+ currentHash = 0;
+ fileReferenceDir = new File(downloadDirectory, reference.value());
+ try {
+ Files.createDirectories(fileReferenceDir.toPath());
+ } catch (IOException e) {
+ log.log(LogLevel.ERROR, "Failed creating directory(" + fileReferenceDir.toPath() + "): " + e.getMessage(), e);
+ throw new RuntimeException("Failed creating directory(" + fileReferenceDir.toPath() + "): ", e);
+ }
+
+ try {
+ inprogressFile = Files.createTempFile(fileReferenceDir.toPath(), fileName, ".inprogress").toFile();
+ } catch (IOException e) {
+ String msg = "Failed creating tempfile for inprogress file for(" + fileName + ") in '" + fileReferenceDir.toPath() + "': ";
+ log.log(LogLevel.ERROR, msg + e.getMessage(), e);
+ throw new RuntimeException(msg, e);
}
}
- void addPart(int partId, byte [] part) throws IOException {
+
+ void addPart(int partId, byte [] part) {
if (partId != currentPartId) {
throw new IllegalStateException("Received partid " + partId + " while expecting " + currentPartId);
}
if (fileSize < currentFileSize + part.length) {
- throw new IllegalStateException("Received part would extend the file from " + currentFileSize + " to " +
- (currentFileSize + part.length) + "but " + fileSize + " is max.");
+ throw new IllegalStateException("Received part would extend the inprogressFile from " + currentFileSize + " to " +
+ (currentFileSize + part.length) + ", but " + fileSize + " is max.");
+ }
+ try {
+ Files.write(inprogressFile.toPath(), part, StandardOpenOption.WRITE, StandardOpenOption.APPEND);
+ } catch (IOException e) {
+ log.log(LogLevel.ERROR, "Failed writing to file(" + inprogressFile.toPath() + "): " + e.getMessage(), e);
+ throw new RuntimeException("Failed writing to file(" + inprogressFile.toPath() + "): ", e);
}
- Files.write(file.toPath(), part, StandardOpenOption.WRITE, StandardOpenOption.APPEND);
currentFileSize += part.length;
currentPartId++;
+ hasher.update(part, 0, part.length);
+ }
+ File close(long hash) {
+ if (hasher.getValue() != hash) {
+ throw new RuntimeException("xxhash from content (" + currentHash + ") is not equal to xxhash in request (" + hash + ")");
+ }
+ File file = new File(fileReferenceDir, fileName);
+ try {
+ // Unpack if necessary
+ if (fileType == FileReferenceData.Type.compressed) {
+ File decompressedDir = Files.createTempDirectory("archive").toFile();
+ log.log(LogLevel.DEBUG, "Archived inprogressFile, unpacking " + inprogressFile + " to " + decompressedDir);
+ CompressedFileReference.decompress(inprogressFile, decompressedDir);
+ moveFileToDestination(decompressedDir, fileReferenceDir);
+ } else {
+ log.log(LogLevel.DEBUG, "Uncompressed inprogressFile, moving to " + file.getAbsolutePath());
+ moveFileToDestination(inprogressFile, file);
+ }
+ } catch (IOException e) {
+ log.log(LogLevel.ERROR, "Failed writing inprogressFile: " + e.getMessage(), e);
+ throw new RuntimeException("Failed writing inprogressFile: ", e);
+ }
+ return file;
}
}
@@ -92,20 +140,21 @@ public class FileReceiver {
// receiveFile after getting a serveFile method call). handler needs to implement receiveFile method
private List<Method> receiveFileMethod(Object handler) {
List<Method> methods = new ArrayList<>();
- methods.add(new Method(RECEIVE_META_METHOD, "ssl", "ii", handler,"receiveFileMeta")
- .paramDesc(0, "filereference", "file reference to download")
- .paramDesc(1, "filename", "filename")
- .paramDesc(2, "filelength", "length in bytes of file")
+ methods.add(new Method(RECEIVE_META_METHOD, "sssl", "ii", handler,"receiveFileMeta")
+ .paramDesc(0, "filereference", "inprogressFile reference to download")
+ .paramDesc(1, "type", "'inprogressFile' or 'compressed'")
+ .paramDesc(2, "filename", "filename")
+ .paramDesc(3, "filelength", "length in bytes of inprogressFile")
.returnDesc(0, "ret", "0 if success, 1 otherwise")
.returnDesc(1, "session-id", "Session id to be used for this transfer"));
methods.add(new Method(RECEIVE_PART_METHOD, "siix", "i", handler,"receiveFilePart")
- .paramDesc(0, "filereference", "file reference to download")
+ .paramDesc(0, "filereference", "inprogressFile reference to download")
.paramDesc(1, "session-id", "Session id to be used for this transfer")
.paramDesc(2, "partid", "relative part number starting at zero")
.paramDesc(3, "data", "bytes in this part")
.returnDesc(0, "ret", "0 if success, 1 otherwise"));
methods.add(new Method(RECEIVE_EOF_METHOD, "silis", "i", handler,"receiveFileEof")
- .paramDesc(0, "filereference", "file reference to download")
+ .paramDesc(0, "filereference", "inprogressFile reference to download")
.paramDesc(1, "session-id", "Session id to be used for this transfer")
.paramDesc(2, "crc-code", "crc code (xxhash64)")
.paramDesc(3, "error-code", "Error code. 0 if none")
@@ -113,12 +162,12 @@ public class FileReceiver {
.returnDesc(0, "ret", "0 if success, 1 if crc mismatch, 2 otherwise"));
// Temporary method until we have chunking
methods.add(new Method(RECEIVE_METHOD, "sssxlis", "i", handler, "receiveFile")
- .methodDesc("receive file reference content")
- .paramDesc(0, "file reference", "file reference to download")
+ .methodDesc("receive inprogressFile reference content")
+ .paramDesc(0, "inprogressFile reference", "inprogressFile reference to download")
.paramDesc(1, "filename", "filename")
- .paramDesc(2, "type", "'file' or 'compressed'")
+ .paramDesc(2, "type", "'inprogressFile' or 'compressed'")
.paramDesc(3, "content", "array of bytes")
- .paramDesc(4, "hash", "xx64hash of the file content")
+ .paramDesc(4, "hash", "xx64hash of the inprogressFile content")
.paramDesc(5, "errorcode", "Error code. 0 if none")
.paramDesc(6, "error-description", "Error description.")
.returnDesc(0, "ret", "0 if success, 1 otherwise"));
@@ -137,11 +186,11 @@ public class FileReceiver {
if (errorCode == 0) {
// TODO: Remove when system test works
- log.log(LogLevel.INFO, "Receiving file reference '" + fileReference.value() + "'");
+ log.log(LogLevel.INFO, "Receiving inprogressFile reference '" + fileReference.value() + "'");
receiveFile(new FileReferenceData(fileReference, filename, FileReferenceData.Type.valueOf(type), content, xxhash));
req.returnValues().add(new Int32Value(0));
} else {
- log.log(LogLevel.WARNING, "Receiving file reference '" + fileReference.value() + "' failed: " + errorDescription);
+ log.log(LogLevel.WARNING, "Receiving inprogressFile reference '" + fileReference.value() + "' failed: " + errorDescription);
req.returnValues().add(new Int32Value(1));
// TODO: Add error description return value here too?
}
@@ -149,11 +198,12 @@ public class FileReceiver {
void receiveFile(FileReferenceData fileReferenceData) {
long xxHashFromContent = hasher.hash(ByteBuffer.wrap(fileReferenceData.content()), 0);
- if (xxHashFromContent != fileReferenceData.xxhash())
- throw new RuntimeException("xxhash from content (" + xxHashFromContent + ") is not equal to xxhash in request (" + fileReferenceData.xxhash()+ ")");
+ if (xxHashFromContent != fileReferenceData.xxhash()) {
+ throw new RuntimeException("xxhash from content (" + xxHashFromContent + ") is not equal to xxhash in request (" + fileReferenceData.xxhash() + ")");
+ }
File fileReferenceDir = new File(downloadDirectory, fileReferenceData.fileReference().value());
- // file might be a directory (and then type is compressed)
+ // inprogressFile might be a directory (and then type is compressed)
File file = new File(fileReferenceDir, fileReferenceData.filename());
try {
File tempFile = new File(Files.createTempDirectory("downloaded").toFile(), fileReferenceData.filename());
@@ -162,31 +212,31 @@ public class FileReceiver {
// Unpack if necessary
if (fileReferenceData.type() == FileReferenceData.Type.compressed) {
File decompressedDir = Files.createTempDirectory("decompressed").toFile();
- log.log(LogLevel.DEBUG, "Compressed file, unpacking " + tempFile + " to " + decompressedDir);
+ log.log(LogLevel.DEBUG, "Compressed inprogressFile, unpacking " + tempFile + " to " + decompressedDir);
CompressedFileReference.decompress(tempFile, decompressedDir);
moveFileToDestination(decompressedDir, fileReferenceDir);
} else {
- log.log(LogLevel.DEBUG, "Uncompressed file, moving to " + file.getAbsolutePath());
+ log.log(LogLevel.DEBUG, "Uncompressed inprogressFile, moving to " + file.getAbsolutePath());
Files.createDirectories(fileReferenceDir.toPath());
moveFileToDestination(tempFile, file);
}
downloader.completedDownloading(fileReferenceData.fileReference(), file);
} catch (IOException e) {
- log.log(LogLevel.ERROR, "Failed writing file: " + e.getMessage(), e);
- throw new RuntimeException("Failed writing file: ", e);
+ log.log(LogLevel.ERROR, "Failed writing inprogressFile: " + e.getMessage(), e);
+ throw new RuntimeException("Failed writing inprogressFile: ", e);
}
}
- private void moveFileToDestination(File tempFile, File destination) {
+ private static void moveFileToDestination(File tempFile, File destination) {
try {
Files.move(tempFile.toPath(), destination.toPath());
log.log(LogLevel.INFO, "File moved from " + tempFile.getAbsolutePath()+ " to " + destination.getAbsolutePath());
} catch (FileAlreadyExistsException e) {
- // Don't fail if it already exists (we might get the file from several config servers when retrying, servers are down etc.
+ // Don't fail if it already exists (we might get the inprogressFile from several config servers when retrying, servers are down etc.
// so it might be written already)
log.log(LogLevel.INFO, "File '" + destination.getAbsolutePath() + "' already exists, continuing: " + e.getMessage());
} catch (IOException e) {
- String message = "Failed moving file '" + tempFile.getAbsolutePath() + "' to '" + destination.getAbsolutePath() + "'";
+ String message = "Failed moving inprogressFile '" + tempFile.getAbsolutePath() + "' to '" + destination.getAbsolutePath() + "'";
log.log(LogLevel.ERROR, message, e);
throw new RuntimeException(message, e);
}
@@ -196,8 +246,9 @@ public class FileReceiver {
public final void receiveFileMeta(Request req) {
log.info("Received method call '" + req.methodName() + "' with parameters : " + req.parameters());
FileReference reference = new FileReference(req.parameters().get(0).asString());
- String fileName = req.parameters().get(1).asString();
- long fileSize = req.parameters().get(2).asInt64();
+ String type = req.parameters().get(1).asString();
+ String fileName = req.parameters().get(2).asString();
+ long fileSize = req.parameters().get(3).asInt64();
int sessionId = nextSessionId.getAndIncrement();
int retval = 0;
synchronized (sessions) {
@@ -206,7 +257,8 @@ public class FileReceiver {
log.severe("Session id " + sessionId + " already exist, impossible. Request from(" + req.target() + ")");
} else {
try {
- sessions.put(sessionId, new Session(sessionId, reference, fileName, fileSize));
+ sessions.put(sessionId, new Session(downloadDirectory, sessionId, reference,
+ FileReferenceData.Type.valueOf(type),fileName, fileSize));
} catch (Exception e) {
retval = 1;
}
@@ -240,9 +292,11 @@ public class FileReceiver {
log.info("Received method call '" + req.methodName() + "' with parameters : " + req.parameters());
FileReference reference = new FileReference(req.parameters().get(0).asString());
int sessionId = req.parameters().get(1).asInt32();
+ long xxhash = req.parameters().get(2).asInt64();
Session session = getSession(sessionId);
int retval = verifySession(session, sessionId, reference);
- downloader.completedDownloading(reference, session.file);
+ File file = session.close(xxhash);
+ downloader.completedDownloading(reference, file);
synchronized (sessions) {
sessions.remove(sessionId);
}
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceData.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceData.java
index 6272390f5cb..809e9e8a6a7 100644
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceData.java
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceData.java
@@ -15,8 +15,6 @@ public class FileReferenceData {
public enum Type {file, compressed}
-
-
private final FileReference fileReference;
private final String filename;
private final Type type;
diff --git a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileReceiverTest.java b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileReceiverTest.java
new file mode 100644
index 00000000000..5edd1151cb1
--- /dev/null
+++ b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileReceiverTest.java
@@ -0,0 +1,59 @@
+package com.yahoo.vespa.filedistribution;
+
+import com.yahoo.config.FileReference;
+import com.yahoo.text.Utf8;
+import net.jpountz.xxhash.XXHash64;
+import net.jpountz.xxhash.XXHashFactory;
+import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+
+public class FileReceiverTest {
+ private final File root = new File(".");
+ private final XXHash64 hasher = XXHashFactory.fastestInstance().hash64();
+ @Test
+ public void receiveMultiPartFile() throws IOException{
+
+ String [] parts = new String[3];
+ parts[0] = "first part\n";
+ parts[1] = "second part\n";
+ parts[2] = "third part\n";
+ StringBuilder sb = new StringBuilder();
+ for (String s : parts) {
+ sb.append(s);
+ }
+ String all = sb.toString();
+ String allRead = transferParts(new FileReference("ref-a"), "myfile-1", all, 1);
+ assertEquals(all, allRead);
+ allRead = transferParts(new FileReference("ref-a"), "myfile-2", all, 2);
+ assertEquals(all, allRead);
+ allRead = transferParts(new FileReference("ref-a"), "myfile-3", all, 3);
+ assertEquals(all, allRead);
+
+ }
+
+ private String transferParts(FileReference ref, String fileName, String all, int numParts) throws IOException {
+ byte [] allContent = Utf8.toBytes(all);
+
+ FileReceiver.Session session = new FileReceiver.Session(root, 1, ref,
+ FileReferenceData.Type.file, fileName, allContent.length);
+ int partSize = (allContent.length+(numParts-1))/numParts;
+ ByteBuffer bb = ByteBuffer.wrap(allContent);
+ for (int i = 0, pos = 0; i < numParts; i++) {
+ byte [] buf = new byte[Math.min(partSize, allContent.length - pos)];
+ bb.get(buf);
+ session.addPart(i, buf);
+ pos += buf.length;
+ }
+ File file = session.close(hasher.hash(ByteBuffer.wrap(allContent), 0));
+
+ byte [] allReadBytes = Files.readAllBytes(file.toPath());
+ file.delete();
+ return Utf8.toString(allReadBytes);
+ }
+}