diff options
author | Harald Musum <musum@oath.com> | 2017-12-13 23:03:27 +0100 |
---|---|---|
committer | Harald Musum <musum@oath.com> | 2017-12-13 23:03:27 +0100 |
commit | c000686ee70e92af50d434de7c61a9d3de22db27 (patch) | |
tree | 84c2b050ca344623a8e81a25bdfcac39884eab6e /filedistribution | |
parent | 99e5cde1894e5fdc5a0dd75c7ca7013c473d3c7f (diff) |
Update download status after each part received
Diffstat (limited to 'filedistribution')
4 files changed, 34 insertions, 22 deletions
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java index c4e010e57d5..d57ce4ca5de 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java @@ -23,6 +23,13 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Logger; +/** + * When asking for a file reference, this handles RPC callbacks from config server with file data and metadata. + * Uses the same Supervisor as the original caller that requests files, so communication uses the same + * connection in both directions. + * + * @author baldersheim + */ public class FileReceiver { private final static Logger log = Logger.getLogger(FileReceiver.class.getName()); @@ -125,6 +132,10 @@ public class FileReceiver { } return file; } + + double percentageReceived() { + return (double)currentFileSize/(double)fileSize; + } } FileReceiver(Supervisor supervisor, FileReferenceDownloader downloader, File downloadDirectory, File tmpDirectory) { @@ -287,6 +298,9 @@ public class FileReceiver { log.severe("Got exception + " + e); retval = 1; } + double completeness = (double) session.currentFileSize / (double) session.fileSize; + log.log(LogLevel.DEBUG, String.format("%.1f percent of '%s' downloaded", completeness * 100, reference.value())); + downloader.setDownloadStatus(reference, completeness); req.returnValues().add(new Int32Value(retval)); } @@ -306,12 +320,12 @@ public class FileReceiver { req.returnValues().add(new Int32Value(retval)); } - private final Session getSession(Integer sessionId) { + private Session getSession(Integer sessionId) { synchronized (sessions) { return sessions.get(sessionId); } } - private static final int verifySession(Session session, int sessionId, FileReference reference) { + private static int verifySession(Session session, int sessionId, FileReference reference) { if (session == null) { log.severe("session-id " + sessionId + " does not exist."); return 1; diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java index 9de4c1fcd5b..048287f0892 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java @@ -2,7 +2,6 @@ package com.yahoo.vespa.filedistribution; -import com.google.common.util.concurrent.AtomicDouble; import com.google.common.util.concurrent.SettableFuture; import com.yahoo.config.FileReference; diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java index 7793e25f7b2..b8402c8aacf 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java @@ -18,7 +18,6 @@ import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; import java.util.Optional; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.logging.Logger; @@ -50,9 +49,8 @@ public class FileReferenceDownloader { this.fileReceiver = new FileReceiver(connectionPool.getSupervisor(), this, downloadDirectory, tmpDirectory); } - private void startDownload(FileReference fileReference, Duration timeout, - FileReferenceDownload fileReferenceDownload) - { + private void startDownload(Duration timeout, FileReferenceDownload fileReferenceDownload) { + FileReference fileReference = fileReferenceDownload.fileReference(); synchronized (downloads) { downloads.put(fileReference, fileReferenceDownload); downloadStatus.put(fileReference, 0.0); @@ -67,7 +65,7 @@ public class FileReferenceDownloader { Thread.sleep(10); } } - catch (InterruptedException | ExecutionException e) {} + catch (InterruptedException e) {} } if ( !downloadStarted) { @@ -80,7 +78,7 @@ public class FileReferenceDownloader { void addToDownloadQueue(FileReferenceDownload fileReferenceDownload) { log.log(LogLevel.DEBUG, "Will download file reference '" + fileReferenceDownload.fileReference().value() + "'"); - downloadExecutor.submit(() -> startDownload(fileReferenceDownload.fileReference(), downloadTimeout, fileReferenceDownload)); + downloadExecutor.submit(() -> startDownload(downloadTimeout, fileReferenceDownload)); } void receiveFile(FileReferenceData fileReferenceData) { @@ -100,7 +98,7 @@ public class FileReferenceDownloader { } } - private boolean startDownloadRpc(FileReference fileReference) throws ExecutionException, InterruptedException { + private boolean startDownloadRpc(FileReference fileReference) { Connection connection = connectionPool.getCurrent(); Request request = new Request("filedistribution.serveFile"); request.parameters().add(new StringValue(fileReference.value())); @@ -173,8 +171,12 @@ public class FileReferenceDownloader { } void setDownloadStatus(String file, double completeness) { + setDownloadStatus(new FileReference(file), completeness); + } + + void setDownloadStatus(FileReference fileReference, double completeness) { synchronized (downloads) { - downloadStatus.put(new FileReference(file), completeness); + downloadStatus.put(fileReference, completeness); } } diff --git a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileReceiverTest.java b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileReceiverTest.java index 34ce53ad4c8..762817c27ef 100644 --- a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileReceiverTest.java +++ b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileReceiverTest.java @@ -33,7 +33,6 @@ public class FileReceiverTest { @Test public void receiveMultiPartFile() throws IOException{ - String [] parts = new String[3]; parts[0] = "first part\n"; parts[1] = "second part\n"; @@ -43,16 +42,12 @@ public class FileReceiverTest { sb.append(s); } String all = sb.toString(); - String allRead = transferParts(new FileReference("ref-a"), "myfile-1", all, 1); - assertEquals(all, allRead); - allRead = transferParts(new FileReference("ref-a"), "myfile-2", all, 2); - assertEquals(all, allRead); - allRead = transferParts(new FileReference("ref-a"), "myfile-3", all, 3); - assertEquals(all, allRead); - + transferPartsAndAssert(new FileReference("ref-a"), "myfile-1", all, 1); + transferPartsAndAssert(new FileReference("ref-a"), "myfile-2", all, 2); + transferPartsAndAssert(new FileReference("ref-a"), "myfile-3", all, 3); } - private String transferParts(FileReference ref, String fileName, String all, int numParts) throws IOException { + private void transferPartsAndAssert(FileReference ref, String fileName, String all, int numParts) throws IOException { byte [] allContent = Utf8.toBytes(all); FileReceiver.Session session = new FileReceiver.Session(root, tempDir, 1, ref, @@ -63,12 +58,14 @@ public class FileReceiverTest { byte [] buf = new byte[Math.min(partSize, allContent.length - pos)]; bb.get(buf); session.addPart(i, buf); + // Small numbers, so need a large delta + assertEquals((double)(i+1)/(double)numParts, session.percentageReceived(), 0.04); pos += buf.length; } - File file = session.close(hasher.hash(ByteBuffer.wrap(allContent), 0)); + File file = session.close(hasher.hash(ByteBuffer.wrap(Utf8.toBytes(all)), 0)); byte [] allReadBytes = Files.readAllBytes(file.toPath()); file.delete(); - return Utf8.toString(allReadBytes); + assertEquals(all, Utf8.toString(allReadBytes)); } } |