diff options
Diffstat (limited to 'filedistribution')
3 files changed, 153 insertions, 42 deletions
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java index 7759a79cce9..d5543fa6823 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java @@ -3,12 +3,12 @@ package com.yahoo.vespa.filedistribution; import com.yahoo.config.FileReference; -import com.yahoo.jrt.DataValue; import com.yahoo.jrt.Int32Value; import com.yahoo.jrt.Method; import com.yahoo.jrt.Request; import com.yahoo.jrt.Supervisor; import com.yahoo.log.LogLevel; +import net.jpountz.xxhash.StreamingXXHash64; import net.jpountz.xxhash.XXHash64; import net.jpountz.xxhash.XXHashFactory; @@ -17,8 +17,8 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.file.FileAlreadyExistsException; import java.nio.file.Files; -import java.nio.file.OpenOption; import java.nio.file.StandardOpenOption; +import java.nio.file.attribute.PosixFilePermissions; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -41,39 +41,87 @@ public class FileReceiver { private final AtomicInteger nextSessionId = new AtomicInteger(1); private final Map<Integer, Session> sessions = new HashMap(); - private final class Session { + final static class Session { + private final StreamingXXHash64 hasher; private final int sessionId; private final FileReference reference; + private final FileReferenceData.Type fileType; private final String fileName; private final long fileSize; private long currentFileSize; private long currentPartId; - private final File file; - Session(int sessionId, FileReference reference, String fileName, long fileSize) throws IOException{ + private long currentHash; + private final File fileReferenceDir; + private final File inprogressFile; + + Session(File downloadDirectory, int sessionId, FileReference reference, + FileReferenceData.Type fileType, String fileName, long fileSize) + { + this.hasher = XXHashFactory.fastestInstance().newStreamingHash64(0); this.sessionId = sessionId; this.reference = reference; + this.fileType = fileType; this.fileName = fileName; this.fileSize = fileSize; currentFileSize = 0; currentPartId = 0; - File fileReferenceDir = new File(downloadDirectory, reference.value()); - Files.createDirectories(fileReferenceDir.toPath()); - file = new File(fileReferenceDir, fileName); - if ( ! file.createNewFile()) { - throw new IllegalStateException("File " + file + "already exists."); + currentHash = 0; + fileReferenceDir = new File(downloadDirectory, reference.value()); + try { + Files.createDirectories(fileReferenceDir.toPath()); + } catch (IOException e) { + log.log(LogLevel.ERROR, "Failed creating directory(" + fileReferenceDir.toPath() + "): " + e.getMessage(), e); + throw new RuntimeException("Failed creating directory(" + fileReferenceDir.toPath() + "): ", e); + } + + try { + inprogressFile = Files.createTempFile(fileReferenceDir.toPath(), fileName, ".inprogress").toFile(); + } catch (IOException e) { + String msg = "Failed creating tempfile for inprogress file for(" + fileName + ") in '" + fileReferenceDir.toPath() + "': "; + log.log(LogLevel.ERROR, msg + e.getMessage(), e); + throw new RuntimeException(msg, e); } } - void addPart(int partId, byte [] part) throws IOException { + + void addPart(int partId, byte [] part) { if (partId != currentPartId) { throw new IllegalStateException("Received partid " + partId + " while expecting " + currentPartId); } if (fileSize < currentFileSize + part.length) { - throw new IllegalStateException("Received part would extend the file from " + currentFileSize + " to " + - (currentFileSize + part.length) + "but " + fileSize + " is max."); + throw new IllegalStateException("Received part would extend the inprogressFile from " + currentFileSize + " to " + + (currentFileSize + part.length) + ", but " + fileSize + " is max."); + } + try { + Files.write(inprogressFile.toPath(), part, StandardOpenOption.WRITE, StandardOpenOption.APPEND); + } catch (IOException e) { + log.log(LogLevel.ERROR, "Failed writing to file(" + inprogressFile.toPath() + "): " + e.getMessage(), e); + throw new RuntimeException("Failed writing to file(" + inprogressFile.toPath() + "): ", e); } - Files.write(file.toPath(), part, StandardOpenOption.WRITE, StandardOpenOption.APPEND); currentFileSize += part.length; currentPartId++; + hasher.update(part, 0, part.length); + } + File close(long hash) { + if (hasher.getValue() != hash) { + throw new RuntimeException("xxhash from content (" + currentHash + ") is not equal to xxhash in request (" + hash + ")"); + } + File file = new File(fileReferenceDir, fileName); + try { + // Unpack if necessary + if (fileType == FileReferenceData.Type.compressed) { + File decompressedDir = Files.createTempDirectory("archive").toFile(); + log.log(LogLevel.DEBUG, "Archived inprogressFile, unpacking " + inprogressFile + " to " + decompressedDir); + CompressedFileReference.decompress(inprogressFile, decompressedDir); + moveFileToDestination(decompressedDir, fileReferenceDir); + } else { + log.log(LogLevel.DEBUG, "Uncompressed inprogressFile, moving to " + file.getAbsolutePath()); + moveFileToDestination(inprogressFile, file); + } + } catch (IOException e) { + log.log(LogLevel.ERROR, "Failed writing inprogressFile: " + e.getMessage(), e); + throw new RuntimeException("Failed writing inprogressFile: ", e); + } + return file; } } @@ -92,20 +140,21 @@ public class FileReceiver { // receiveFile after getting a serveFile method call). handler needs to implement receiveFile method private List<Method> receiveFileMethod(Object handler) { List<Method> methods = new ArrayList<>(); - methods.add(new Method(RECEIVE_META_METHOD, "ssl", "ii", handler,"receiveFileMeta") - .paramDesc(0, "filereference", "file reference to download") - .paramDesc(1, "filename", "filename") - .paramDesc(2, "filelength", "length in bytes of file") + methods.add(new Method(RECEIVE_META_METHOD, "sssl", "ii", handler,"receiveFileMeta") + .paramDesc(0, "filereference", "inprogressFile reference to download") + .paramDesc(1, "type", "'inprogressFile' or 'compressed'") + .paramDesc(2, "filename", "filename") + .paramDesc(3, "filelength", "length in bytes of inprogressFile") .returnDesc(0, "ret", "0 if success, 1 otherwise") .returnDesc(1, "session-id", "Session id to be used for this transfer")); methods.add(new Method(RECEIVE_PART_METHOD, "siix", "i", handler,"receiveFilePart") - .paramDesc(0, "filereference", "file reference to download") + .paramDesc(0, "filereference", "inprogressFile reference to download") .paramDesc(1, "session-id", "Session id to be used for this transfer") .paramDesc(2, "partid", "relative part number starting at zero") .paramDesc(3, "data", "bytes in this part") .returnDesc(0, "ret", "0 if success, 1 otherwise")); methods.add(new Method(RECEIVE_EOF_METHOD, "silis", "i", handler,"receiveFileEof") - .paramDesc(0, "filereference", "file reference to download") + .paramDesc(0, "filereference", "inprogressFile reference to download") .paramDesc(1, "session-id", "Session id to be used for this transfer") .paramDesc(2, "crc-code", "crc code (xxhash64)") .paramDesc(3, "error-code", "Error code. 0 if none") @@ -113,12 +162,12 @@ public class FileReceiver { .returnDesc(0, "ret", "0 if success, 1 if crc mismatch, 2 otherwise")); // Temporary method until we have chunking methods.add(new Method(RECEIVE_METHOD, "sssxlis", "i", handler, "receiveFile") - .methodDesc("receive file reference content") - .paramDesc(0, "file reference", "file reference to download") + .methodDesc("receive inprogressFile reference content") + .paramDesc(0, "inprogressFile reference", "inprogressFile reference to download") .paramDesc(1, "filename", "filename") - .paramDesc(2, "type", "'file' or 'compressed'") + .paramDesc(2, "type", "'inprogressFile' or 'compressed'") .paramDesc(3, "content", "array of bytes") - .paramDesc(4, "hash", "xx64hash of the file content") + .paramDesc(4, "hash", "xx64hash of the inprogressFile content") .paramDesc(5, "errorcode", "Error code. 0 if none") .paramDesc(6, "error-description", "Error description.") .returnDesc(0, "ret", "0 if success, 1 otherwise")); @@ -137,11 +186,11 @@ public class FileReceiver { if (errorCode == 0) { // TODO: Remove when system test works - log.log(LogLevel.INFO, "Receiving file reference '" + fileReference.value() + "'"); + log.log(LogLevel.INFO, "Receiving inprogressFile reference '" + fileReference.value() + "'"); receiveFile(new FileReferenceData(fileReference, filename, FileReferenceData.Type.valueOf(type), content, xxhash)); req.returnValues().add(new Int32Value(0)); } else { - log.log(LogLevel.WARNING, "Receiving file reference '" + fileReference.value() + "' failed: " + errorDescription); + log.log(LogLevel.WARNING, "Receiving inprogressFile reference '" + fileReference.value() + "' failed: " + errorDescription); req.returnValues().add(new Int32Value(1)); // TODO: Add error description return value here too? } @@ -149,11 +198,12 @@ public class FileReceiver { void receiveFile(FileReferenceData fileReferenceData) { long xxHashFromContent = hasher.hash(ByteBuffer.wrap(fileReferenceData.content()), 0); - if (xxHashFromContent != fileReferenceData.xxhash()) - throw new RuntimeException("xxhash from content (" + xxHashFromContent + ") is not equal to xxhash in request (" + fileReferenceData.xxhash()+ ")"); + if (xxHashFromContent != fileReferenceData.xxhash()) { + throw new RuntimeException("xxhash from content (" + xxHashFromContent + ") is not equal to xxhash in request (" + fileReferenceData.xxhash() + ")"); + } File fileReferenceDir = new File(downloadDirectory, fileReferenceData.fileReference().value()); - // file might be a directory (and then type is compressed) + // inprogressFile might be a directory (and then type is compressed) File file = new File(fileReferenceDir, fileReferenceData.filename()); try { File tempFile = new File(Files.createTempDirectory("downloaded").toFile(), fileReferenceData.filename()); @@ -162,31 +212,31 @@ public class FileReceiver { // Unpack if necessary if (fileReferenceData.type() == FileReferenceData.Type.compressed) { File decompressedDir = Files.createTempDirectory("decompressed").toFile(); - log.log(LogLevel.DEBUG, "Compressed file, unpacking " + tempFile + " to " + decompressedDir); + log.log(LogLevel.DEBUG, "Compressed inprogressFile, unpacking " + tempFile + " to " + decompressedDir); CompressedFileReference.decompress(tempFile, decompressedDir); moveFileToDestination(decompressedDir, fileReferenceDir); } else { - log.log(LogLevel.DEBUG, "Uncompressed file, moving to " + file.getAbsolutePath()); + log.log(LogLevel.DEBUG, "Uncompressed inprogressFile, moving to " + file.getAbsolutePath()); Files.createDirectories(fileReferenceDir.toPath()); moveFileToDestination(tempFile, file); } downloader.completedDownloading(fileReferenceData.fileReference(), file); } catch (IOException e) { - log.log(LogLevel.ERROR, "Failed writing file: " + e.getMessage(), e); - throw new RuntimeException("Failed writing file: ", e); + log.log(LogLevel.ERROR, "Failed writing inprogressFile: " + e.getMessage(), e); + throw new RuntimeException("Failed writing inprogressFile: ", e); } } - private void moveFileToDestination(File tempFile, File destination) { + private static void moveFileToDestination(File tempFile, File destination) { try { Files.move(tempFile.toPath(), destination.toPath()); log.log(LogLevel.INFO, "File moved from " + tempFile.getAbsolutePath()+ " to " + destination.getAbsolutePath()); } catch (FileAlreadyExistsException e) { - // Don't fail if it already exists (we might get the file from several config servers when retrying, servers are down etc. + // Don't fail if it already exists (we might get the inprogressFile from several config servers when retrying, servers are down etc. // so it might be written already) log.log(LogLevel.INFO, "File '" + destination.getAbsolutePath() + "' already exists, continuing: " + e.getMessage()); } catch (IOException e) { - String message = "Failed moving file '" + tempFile.getAbsolutePath() + "' to '" + destination.getAbsolutePath() + "'"; + String message = "Failed moving inprogressFile '" + tempFile.getAbsolutePath() + "' to '" + destination.getAbsolutePath() + "'"; log.log(LogLevel.ERROR, message, e); throw new RuntimeException(message, e); } @@ -196,8 +246,9 @@ public class FileReceiver { public final void receiveFileMeta(Request req) { log.info("Received method call '" + req.methodName() + "' with parameters : " + req.parameters()); FileReference reference = new FileReference(req.parameters().get(0).asString()); - String fileName = req.parameters().get(1).asString(); - long fileSize = req.parameters().get(2).asInt64(); + String type = req.parameters().get(1).asString(); + String fileName = req.parameters().get(2).asString(); + long fileSize = req.parameters().get(3).asInt64(); int sessionId = nextSessionId.getAndIncrement(); int retval = 0; synchronized (sessions) { @@ -206,7 +257,8 @@ public class FileReceiver { log.severe("Session id " + sessionId + " already exist, impossible. Request from(" + req.target() + ")"); } else { try { - sessions.put(sessionId, new Session(sessionId, reference, fileName, fileSize)); + sessions.put(sessionId, new Session(downloadDirectory, sessionId, reference, + FileReferenceData.Type.valueOf(type),fileName, fileSize)); } catch (Exception e) { retval = 1; } @@ -240,9 +292,11 @@ public class FileReceiver { log.info("Received method call '" + req.methodName() + "' with parameters : " + req.parameters()); FileReference reference = new FileReference(req.parameters().get(0).asString()); int sessionId = req.parameters().get(1).asInt32(); + long xxhash = req.parameters().get(2).asInt64(); Session session = getSession(sessionId); int retval = verifySession(session, sessionId, reference); - downloader.completedDownloading(reference, session.file); + File file = session.close(xxhash); + downloader.completedDownloading(reference, file); synchronized (sessions) { sessions.remove(sessionId); } diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceData.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceData.java index 6272390f5cb..809e9e8a6a7 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceData.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceData.java @@ -15,8 +15,6 @@ public class FileReferenceData { public enum Type {file, compressed} - - private final FileReference fileReference; private final String filename; private final Type type; diff --git a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileReceiverTest.java b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileReceiverTest.java new file mode 100644 index 00000000000..5edd1151cb1 --- /dev/null +++ b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileReceiverTest.java @@ -0,0 +1,59 @@ +package com.yahoo.vespa.filedistribution; + +import com.yahoo.config.FileReference; +import com.yahoo.text.Utf8; +import net.jpountz.xxhash.XXHash64; +import net.jpountz.xxhash.XXHashFactory; +import org.junit.Test; +import static org.junit.Assert.assertEquals; + + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; + +public class FileReceiverTest { + private final File root = new File("."); + private final XXHash64 hasher = XXHashFactory.fastestInstance().hash64(); + @Test + public void receiveMultiPartFile() throws IOException{ + + String [] parts = new String[3]; + parts[0] = "first part\n"; + parts[1] = "second part\n"; + parts[2] = "third part\n"; + StringBuilder sb = new StringBuilder(); + for (String s : parts) { + sb.append(s); + } + String all = sb.toString(); + String allRead = transferParts(new FileReference("ref-a"), "myfile-1", all, 1); + assertEquals(all, allRead); + allRead = transferParts(new FileReference("ref-a"), "myfile-2", all, 2); + assertEquals(all, allRead); + allRead = transferParts(new FileReference("ref-a"), "myfile-3", all, 3); + assertEquals(all, allRead); + + } + + private String transferParts(FileReference ref, String fileName, String all, int numParts) throws IOException { + byte [] allContent = Utf8.toBytes(all); + + FileReceiver.Session session = new FileReceiver.Session(root, 1, ref, + FileReferenceData.Type.file, fileName, allContent.length); + int partSize = (allContent.length+(numParts-1))/numParts; + ByteBuffer bb = ByteBuffer.wrap(allContent); + for (int i = 0, pos = 0; i < numParts; i++) { + byte [] buf = new byte[Math.min(partSize, allContent.length - pos)]; + bb.get(buf); + session.addPart(i, buf); + pos += buf.length; + } + File file = session.close(hasher.hash(ByteBuffer.wrap(allContent), 0)); + + byte [] allReadBytes = Files.readAllBytes(file.toPath()); + file.delete(); + return Utf8.toString(allReadBytes); + } +} |