diff options
Diffstat (limited to 'filedistribution')
-rw-r--r-- | filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java | 102 |
1 files changed, 102 insertions, 0 deletions
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java index 1ac3a1bd7df..7759a79cce9 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java @@ -3,6 +3,7 @@ package com.yahoo.vespa.filedistribution; import com.yahoo.config.FileReference; +import com.yahoo.jrt.DataValue; import com.yahoo.jrt.Int32Value; import com.yahoo.jrt.Method; import com.yahoo.jrt.Request; @@ -16,8 +17,13 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.file.FileAlreadyExistsException; import java.nio.file.Files; +import java.nio.file.OpenOption; +import java.nio.file.StandardOpenOption; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Logger; public class FileReceiver { @@ -32,6 +38,44 @@ public class FileReceiver { private final FileReferenceDownloader downloader; private final File downloadDirectory; private final XXHash64 hasher = XXHashFactory.fastestInstance().hash64(); + private final AtomicInteger nextSessionId = new AtomicInteger(1); + private final Map<Integer, Session> sessions = new HashMap(); + + private final class Session { + private final int sessionId; + private final FileReference reference; + private final String fileName; + private final long fileSize; + private long currentFileSize; + private long currentPartId; + private final File file; + Session(int sessionId, FileReference reference, String fileName, long fileSize) throws IOException{ + this.sessionId = sessionId; + this.reference = reference; + this.fileName = fileName; + this.fileSize = fileSize; + currentFileSize = 0; + currentPartId = 0; + File fileReferenceDir = new File(downloadDirectory, reference.value()); + Files.createDirectories(fileReferenceDir.toPath()); + file = new File(fileReferenceDir, fileName); + if ( ! file.createNewFile()) { + throw new IllegalStateException("File " + file + "already exists."); + } + } + void addPart(int partId, byte [] part) throws IOException { + if (partId != currentPartId) { + throw new IllegalStateException("Received partid " + partId + " while expecting " + currentPartId); + } + if (fileSize < currentFileSize + part.length) { + throw new IllegalStateException("Received part would extend the file from " + currentFileSize + " to " + + (currentFileSize + part.length) + "but " + fileSize + " is max."); + } + Files.write(file.toPath(), part, StandardOpenOption.WRITE, StandardOpenOption.APPEND); + currentFileSize += part.length; + currentPartId++; + } + } FileReceiver(Supervisor supervisor, FileReferenceDownloader downloader, File downloadDirectory) { this.supervisor = supervisor; @@ -151,15 +195,73 @@ public class FileReceiver { @SuppressWarnings({"UnusedDeclaration"}) public final void receiveFileMeta(Request req) { log.info("Received method call '" + req.methodName() + "' with parameters : " + req.parameters()); + FileReference reference = new FileReference(req.parameters().get(0).asString()); + String fileName = req.parameters().get(1).asString(); + long fileSize = req.parameters().get(2).asInt64(); + int sessionId = nextSessionId.getAndIncrement(); + int retval = 0; + synchronized (sessions) { + if (sessions.containsKey(sessionId)) { + retval = 1; + log.severe("Session id " + sessionId + " already exist, impossible. Request from(" + req.target() + ")"); + } else { + try { + sessions.put(sessionId, new Session(sessionId, reference, fileName, fileSize)); + } catch (Exception e) { + retval = 1; + } + } + } + req.returnValues().add(new Int32Value(retval)); + req.returnValues().add(new Int32Value(sessionId)); } @SuppressWarnings({"UnusedDeclaration"}) public final void receiveFilePart(Request req) { log.info("Received method call '" + req.methodName() + "' with parameters : " + req.parameters()); + + FileReference reference = new FileReference(req.parameters().get(0).asString()); + int sessionId = req.parameters().get(1).asInt32(); + int partId = req.parameters().get(2).asInt32(); + byte [] part = req.parameters().get(3).asData(); + Session session = getSession(sessionId); + int retval = verifySession(session, sessionId, reference); + try { + session.addPart(partId, part); + } catch (Exception e) { + log.severe("Got exception + " + e); + retval = 1; + } + req.returnValues().add(new Int32Value(retval)); } @SuppressWarnings({"UnusedDeclaration"}) public final void receiveFileEof(Request req) { log.info("Received method call '" + req.methodName() + "' with parameters : " + req.parameters()); + FileReference reference = new FileReference(req.parameters().get(0).asString()); + int sessionId = req.parameters().get(1).asInt32(); + Session session = getSession(sessionId); + int retval = verifySession(session, sessionId, reference); + downloader.completedDownloading(reference, session.file); + synchronized (sessions) { + sessions.remove(sessionId); + } + } + + private final Session getSession(Integer sessionId) { + synchronized (sessions) { + return sessions.get(sessionId); + } + } + private static final int verifySession(Session session, int sessionId, FileReference reference) { + if (session == null) { + log.severe("session-id " + sessionId + " does not exist."); + return 1; + } + if (! session.reference.equals(reference)) { + log.severe("Session " + session.sessionId + " expects reference " + reference.value() + ", but was " + session.reference.value()); + return 1; + } + return 0; } } |