summaryrefslogtreecommitdiffstats
path: root/filedistribution
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-11-17 14:45:05 +0100
committerHenning Baldersheim <balder@yahoo-inc.com>2017-12-07 22:56:15 +0100
commit21f36060bcf0577ee450d4cce56edf5941d50fa5 (patch)
tree24f488275095652f564b74b2c98f7e67f0890b0a /filedistribution
parent5577b918e2c66b9c0238892e4855ed17673c28f2 (diff)
Implement chunked receiver.
Diffstat (limited to 'filedistribution')
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java102
1 files changed, 102 insertions, 0 deletions
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java
index 1ac3a1bd7df..7759a79cce9 100644
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java
@@ -3,6 +3,7 @@
package com.yahoo.vespa.filedistribution;
import com.yahoo.config.FileReference;
+import com.yahoo.jrt.DataValue;
import com.yahoo.jrt.Int32Value;
import com.yahoo.jrt.Method;
import com.yahoo.jrt.Request;
@@ -16,8 +17,13 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
+import java.nio.file.OpenOption;
+import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
public class FileReceiver {
@@ -32,6 +38,44 @@ public class FileReceiver {
private final FileReferenceDownloader downloader;
private final File downloadDirectory;
private final XXHash64 hasher = XXHashFactory.fastestInstance().hash64();
+ private final AtomicInteger nextSessionId = new AtomicInteger(1);
+ private final Map<Integer, Session> sessions = new HashMap();
+
+ private final class Session {
+ private final int sessionId;
+ private final FileReference reference;
+ private final String fileName;
+ private final long fileSize;
+ private long currentFileSize;
+ private long currentPartId;
+ private final File file;
+ Session(int sessionId, FileReference reference, String fileName, long fileSize) throws IOException{
+ this.sessionId = sessionId;
+ this.reference = reference;
+ this.fileName = fileName;
+ this.fileSize = fileSize;
+ currentFileSize = 0;
+ currentPartId = 0;
+ File fileReferenceDir = new File(downloadDirectory, reference.value());
+ Files.createDirectories(fileReferenceDir.toPath());
+ file = new File(fileReferenceDir, fileName);
+ if ( ! file.createNewFile()) {
+ throw new IllegalStateException("File " + file + "already exists.");
+ }
+ }
+ void addPart(int partId, byte [] part) throws IOException {
+ if (partId != currentPartId) {
+ throw new IllegalStateException("Received partid " + partId + " while expecting " + currentPartId);
+ }
+ if (fileSize < currentFileSize + part.length) {
+ throw new IllegalStateException("Received part would extend the file from " + currentFileSize + " to " +
+ (currentFileSize + part.length) + "but " + fileSize + " is max.");
+ }
+ Files.write(file.toPath(), part, StandardOpenOption.WRITE, StandardOpenOption.APPEND);
+ currentFileSize += part.length;
+ currentPartId++;
+ }
+ }
FileReceiver(Supervisor supervisor, FileReferenceDownloader downloader, File downloadDirectory) {
this.supervisor = supervisor;
@@ -151,15 +195,73 @@ public class FileReceiver {
@SuppressWarnings({"UnusedDeclaration"})
public final void receiveFileMeta(Request req) {
log.info("Received method call '" + req.methodName() + "' with parameters : " + req.parameters());
+ FileReference reference = new FileReference(req.parameters().get(0).asString());
+ String fileName = req.parameters().get(1).asString();
+ long fileSize = req.parameters().get(2).asInt64();
+ int sessionId = nextSessionId.getAndIncrement();
+ int retval = 0;
+ synchronized (sessions) {
+ if (sessions.containsKey(sessionId)) {
+ retval = 1;
+ log.severe("Session id " + sessionId + " already exist, impossible. Request from(" + req.target() + ")");
+ } else {
+ try {
+ sessions.put(sessionId, new Session(sessionId, reference, fileName, fileSize));
+ } catch (Exception e) {
+ retval = 1;
+ }
+ }
+ }
+ req.returnValues().add(new Int32Value(retval));
+ req.returnValues().add(new Int32Value(sessionId));
}
@SuppressWarnings({"UnusedDeclaration"})
public final void receiveFilePart(Request req) {
log.info("Received method call '" + req.methodName() + "' with parameters : " + req.parameters());
+
+ FileReference reference = new FileReference(req.parameters().get(0).asString());
+ int sessionId = req.parameters().get(1).asInt32();
+ int partId = req.parameters().get(2).asInt32();
+ byte [] part = req.parameters().get(3).asData();
+ Session session = getSession(sessionId);
+ int retval = verifySession(session, sessionId, reference);
+ try {
+ session.addPart(partId, part);
+ } catch (Exception e) {
+ log.severe("Got exception + " + e);
+ retval = 1;
+ }
+ req.returnValues().add(new Int32Value(retval));
}
@SuppressWarnings({"UnusedDeclaration"})
public final void receiveFileEof(Request req) {
log.info("Received method call '" + req.methodName() + "' with parameters : " + req.parameters());
+ FileReference reference = new FileReference(req.parameters().get(0).asString());
+ int sessionId = req.parameters().get(1).asInt32();
+ Session session = getSession(sessionId);
+ int retval = verifySession(session, sessionId, reference);
+ downloader.completedDownloading(reference, session.file);
+ synchronized (sessions) {
+ sessions.remove(sessionId);
+ }
+ }
+
+ private final Session getSession(Integer sessionId) {
+ synchronized (sessions) {
+ return sessions.get(sessionId);
+ }
+ }
+ private static final int verifySession(Session session, int sessionId, FileReference reference) {
+ if (session == null) {
+ log.severe("session-id " + sessionId + " does not exist.");
+ return 1;
+ }
+ if (! session.reference.equals(reference)) {
+ log.severe("Session " + session.sessionId + " expects reference " + reference.value() + ", but was " + session.reference.value());
+ return 1;
+ }
+ return 0;
}
}