summaryrefslogtreecommitdiffstats
path: root/filedistribution
diff options
context:
space:
mode:
authorHarald Musum <musum@oath.com>2017-12-13 23:03:27 +0100
committerHarald Musum <musum@oath.com>2017-12-13 23:03:27 +0100
commitc000686ee70e92af50d434de7c61a9d3de22db27 (patch)
tree84c2b050ca344623a8e81a25bdfcac39884eab6e /filedistribution
parent99e5cde1894e5fdc5a0dd75c7ca7013c473d3c7f (diff)
Update download status after each part received
Diffstat (limited to 'filedistribution')
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java18
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java1
-rw-r--r--filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java18
-rw-r--r--filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileReceiverTest.java19
4 files changed, 34 insertions, 22 deletions
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java
index c4e010e57d5..d57ce4ca5de 100644
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java
@@ -23,6 +23,13 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
+/**
+ * When asking for a file reference, this handles RPC callbacks from config server with file data and metadata.
+ * Uses the same Supervisor as the original caller that requests files, so communication uses the same
+ * connection in both directions.
+ *
+ * @author baldersheim
+ */
public class FileReceiver {
private final static Logger log = Logger.getLogger(FileReceiver.class.getName());
@@ -125,6 +132,10 @@ public class FileReceiver {
}
return file;
}
+
+ double percentageReceived() {
+ return (double)currentFileSize/(double)fileSize;
+ }
}
FileReceiver(Supervisor supervisor, FileReferenceDownloader downloader, File downloadDirectory, File tmpDirectory) {
@@ -287,6 +298,9 @@ public class FileReceiver {
log.severe("Got exception + " + e);
retval = 1;
}
+ double completeness = (double) session.currentFileSize / (double) session.fileSize;
+ log.log(LogLevel.DEBUG, String.format("%.1f percent of '%s' downloaded", completeness * 100, reference.value()));
+ downloader.setDownloadStatus(reference, completeness);
req.returnValues().add(new Int32Value(retval));
}
@@ -306,12 +320,12 @@ public class FileReceiver {
req.returnValues().add(new Int32Value(retval));
}
- private final Session getSession(Integer sessionId) {
+ private Session getSession(Integer sessionId) {
synchronized (sessions) {
return sessions.get(sessionId);
}
}
- private static final int verifySession(Session session, int sessionId, FileReference reference) {
+ private static int verifySession(Session session, int sessionId, FileReference reference) {
if (session == null) {
log.severe("session-id " + sessionId + " does not exist.");
return 1;
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java
index 9de4c1fcd5b..048287f0892 100644
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java
@@ -2,7 +2,6 @@
package com.yahoo.vespa.filedistribution;
-import com.google.common.util.concurrent.AtomicDouble;
import com.google.common.util.concurrent.SettableFuture;
import com.yahoo.config.FileReference;
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java
index 7793e25f7b2..b8402c8aacf 100644
--- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java
+++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java
@@ -18,7 +18,6 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Logger;
@@ -50,9 +49,8 @@ public class FileReferenceDownloader {
this.fileReceiver = new FileReceiver(connectionPool.getSupervisor(), this, downloadDirectory, tmpDirectory);
}
- private void startDownload(FileReference fileReference, Duration timeout,
- FileReferenceDownload fileReferenceDownload)
- {
+ private void startDownload(Duration timeout, FileReferenceDownload fileReferenceDownload) {
+ FileReference fileReference = fileReferenceDownload.fileReference();
synchronized (downloads) {
downloads.put(fileReference, fileReferenceDownload);
downloadStatus.put(fileReference, 0.0);
@@ -67,7 +65,7 @@ public class FileReferenceDownloader {
Thread.sleep(10);
}
}
- catch (InterruptedException | ExecutionException e) {}
+ catch (InterruptedException e) {}
}
if ( !downloadStarted) {
@@ -80,7 +78,7 @@ public class FileReferenceDownloader {
void addToDownloadQueue(FileReferenceDownload fileReferenceDownload) {
log.log(LogLevel.DEBUG, "Will download file reference '" + fileReferenceDownload.fileReference().value() + "'");
- downloadExecutor.submit(() -> startDownload(fileReferenceDownload.fileReference(), downloadTimeout, fileReferenceDownload));
+ downloadExecutor.submit(() -> startDownload(downloadTimeout, fileReferenceDownload));
}
void receiveFile(FileReferenceData fileReferenceData) {
@@ -100,7 +98,7 @@ public class FileReferenceDownloader {
}
}
- private boolean startDownloadRpc(FileReference fileReference) throws ExecutionException, InterruptedException {
+ private boolean startDownloadRpc(FileReference fileReference) {
Connection connection = connectionPool.getCurrent();
Request request = new Request("filedistribution.serveFile");
request.parameters().add(new StringValue(fileReference.value()));
@@ -173,8 +171,12 @@ public class FileReferenceDownloader {
}
void setDownloadStatus(String file, double completeness) {
+ setDownloadStatus(new FileReference(file), completeness);
+ }
+
+ void setDownloadStatus(FileReference fileReference, double completeness) {
synchronized (downloads) {
- downloadStatus.put(new FileReference(file), completeness);
+ downloadStatus.put(fileReference, completeness);
}
}
diff --git a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileReceiverTest.java b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileReceiverTest.java
index 34ce53ad4c8..762817c27ef 100644
--- a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileReceiverTest.java
+++ b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileReceiverTest.java
@@ -33,7 +33,6 @@ public class FileReceiverTest {
@Test
public void receiveMultiPartFile() throws IOException{
-
String [] parts = new String[3];
parts[0] = "first part\n";
parts[1] = "second part\n";
@@ -43,16 +42,12 @@ public class FileReceiverTest {
sb.append(s);
}
String all = sb.toString();
- String allRead = transferParts(new FileReference("ref-a"), "myfile-1", all, 1);
- assertEquals(all, allRead);
- allRead = transferParts(new FileReference("ref-a"), "myfile-2", all, 2);
- assertEquals(all, allRead);
- allRead = transferParts(new FileReference("ref-a"), "myfile-3", all, 3);
- assertEquals(all, allRead);
-
+ transferPartsAndAssert(new FileReference("ref-a"), "myfile-1", all, 1);
+ transferPartsAndAssert(new FileReference("ref-a"), "myfile-2", all, 2);
+ transferPartsAndAssert(new FileReference("ref-a"), "myfile-3", all, 3);
}
- private String transferParts(FileReference ref, String fileName, String all, int numParts) throws IOException {
+ private void transferPartsAndAssert(FileReference ref, String fileName, String all, int numParts) throws IOException {
byte [] allContent = Utf8.toBytes(all);
FileReceiver.Session session = new FileReceiver.Session(root, tempDir, 1, ref,
@@ -63,12 +58,14 @@ public class FileReceiverTest {
byte [] buf = new byte[Math.min(partSize, allContent.length - pos)];
bb.get(buf);
session.addPart(i, buf);
+ // Small numbers, so need a large delta
+ assertEquals((double)(i+1)/(double)numParts, session.percentageReceived(), 0.04);
pos += buf.length;
}
- File file = session.close(hasher.hash(ByteBuffer.wrap(allContent), 0));
+ File file = session.close(hasher.hash(ByteBuffer.wrap(Utf8.toBytes(all)), 0));
byte [] allReadBytes = Files.readAllBytes(file.toPath());
file.delete();
- return Utf8.toString(allReadBytes);
+ assertEquals(all, Utf8.toString(allReadBytes));
}
}