diff options
Diffstat (limited to 'filedistribution')
8 files changed, 51 insertions, 36 deletions
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDistributionRpcServer.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDistributionRpcServer.java index d09cf17b9e3..2e58455bc39 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDistributionRpcServer.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDistributionRpcServer.java @@ -118,7 +118,7 @@ public class FileDistributionRpcServer { List<FileReference> fileReferences = Stream.of(fileReferenceStrings) .map(FileReference::new) .collect(Collectors.toList()); - downloader.queueForDownload(fileReferences); + downloader.queueForAsyncDownload(fileReferences); req.returnValues().add(new Int32Value(0)); } diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java index 6610893d8ae..05bcaacb107 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java @@ -67,12 +67,19 @@ public class FileDownloader { } else { log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' not found in " + directory.getAbsolutePath() + ", starting download"); - return queueForDownload(fileReference, timeout); + return queueForAsyncDownload(fileReference, timeout); } } - public void queueForDownload(List<FileReference> fileReferences) { - fileReferences.forEach(this::queueForDownload); + // Start downloading, but there is no Future used get file being downloaded + public void queueForAsyncDownload(List<FileReference> fileReferences) { + fileReferences.forEach(fileReference -> { + if (fileReferenceDownloader.isDownloading(fileReference)) { + log.log(LogLevel.DEBUG, "Already downloading '" + fileReference.value() + "'"); + } else { + queueForAsyncDownload(fileReference).cancel(false); + } + }); } void receiveFile(FileReferenceData fileReferenceData) { @@ -107,24 +114,20 @@ public class FileDownloader { return Optional.empty(); } - private synchronized Future<Optional<File>> queueForDownload(FileReference fileReference, Duration timeout) { + private synchronized Future<Optional<File>> queueForAsyncDownload(FileReference fileReference, Duration timeout) { Future<Optional<File>> inProgress = fileReferenceDownloader.addDownloadListener(fileReference, () -> getFile(fileReference)); if (inProgress != null) { log.log(LogLevel.DEBUG, "Already downloading '" + fileReference.value() + "'"); return inProgress; } - Future<Optional<File>> future = queueForDownload(fileReference); + Future<Optional<File>> future = queueForAsyncDownload(fileReference); log.log(LogLevel.INFO, "Queued '" + fileReference.value() + "' for download with timeout " + timeout); return future; } - // We don't care about the future in this call - private Future<Optional<File>> queueForDownload(FileReference fileReference) { - return queueForDownload(new FileReferenceDownload(fileReference, SettableFuture.create())); - } - - private Future<Optional<File>> queueForDownload(FileReferenceDownload fileReferenceDownload) { + private Future<Optional<File>> queueForAsyncDownload(FileReference fileReference) { + FileReferenceDownload fileReferenceDownload = new FileReferenceDownload(fileReference, SettableFuture.create()); fileReferenceDownloader.addToDownloadQueue(fileReferenceDownload); return fileReferenceDownload.future(); } diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java index c4e010e57d5..d57ce4ca5de 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java @@ -23,6 +23,13 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Logger; +/** + * When asking for a file reference, this handles RPC callbacks from config server with file data and metadata. + * Uses the same Supervisor as the original caller that requests files, so communication uses the same + * connection in both directions. + * + * @author baldersheim + */ public class FileReceiver { private final static Logger log = Logger.getLogger(FileReceiver.class.getName()); @@ -125,6 +132,10 @@ public class FileReceiver { } return file; } + + double percentageReceived() { + return (double)currentFileSize/(double)fileSize; + } } FileReceiver(Supervisor supervisor, FileReferenceDownloader downloader, File downloadDirectory, File tmpDirectory) { @@ -287,6 +298,9 @@ public class FileReceiver { log.severe("Got exception + " + e); retval = 1; } + double completeness = (double) session.currentFileSize / (double) session.fileSize; + log.log(LogLevel.DEBUG, String.format("%.1f percent of '%s' downloaded", completeness * 100, reference.value())); + downloader.setDownloadStatus(reference, completeness); req.returnValues().add(new Int32Value(retval)); } @@ -306,12 +320,12 @@ public class FileReceiver { req.returnValues().add(new Int32Value(retval)); } - private final Session getSession(Integer sessionId) { + private Session getSession(Integer sessionId) { synchronized (sessions) { return sessions.get(sessionId); } } - private static final int verifySession(Session session, int sessionId, FileReference reference) { + private static int verifySession(Session session, int sessionId, FileReference reference) { if (session == null) { log.severe("session-id " + sessionId + " does not exist."); return 1; diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java index 9de4c1fcd5b..048287f0892 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownload.java @@ -2,7 +2,6 @@ package com.yahoo.vespa.filedistribution; -import com.google.common.util.concurrent.AtomicDouble; import com.google.common.util.concurrent.SettableFuture; import com.yahoo.config.FileReference; diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java index 7793e25f7b2..b8402c8aacf 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java @@ -18,7 +18,6 @@ import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; import java.util.Optional; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.logging.Logger; @@ -50,9 +49,8 @@ public class FileReferenceDownloader { this.fileReceiver = new FileReceiver(connectionPool.getSupervisor(), this, downloadDirectory, tmpDirectory); } - private void startDownload(FileReference fileReference, Duration timeout, - FileReferenceDownload fileReferenceDownload) - { + private void startDownload(Duration timeout, FileReferenceDownload fileReferenceDownload) { + FileReference fileReference = fileReferenceDownload.fileReference(); synchronized (downloads) { downloads.put(fileReference, fileReferenceDownload); downloadStatus.put(fileReference, 0.0); @@ -67,7 +65,7 @@ public class FileReferenceDownloader { Thread.sleep(10); } } - catch (InterruptedException | ExecutionException e) {} + catch (InterruptedException e) {} } if ( !downloadStarted) { @@ -80,7 +78,7 @@ public class FileReferenceDownloader { void addToDownloadQueue(FileReferenceDownload fileReferenceDownload) { log.log(LogLevel.DEBUG, "Will download file reference '" + fileReferenceDownload.fileReference().value() + "'"); - downloadExecutor.submit(() -> startDownload(fileReferenceDownload.fileReference(), downloadTimeout, fileReferenceDownload)); + downloadExecutor.submit(() -> startDownload(downloadTimeout, fileReferenceDownload)); } void receiveFile(FileReferenceData fileReferenceData) { @@ -100,7 +98,7 @@ public class FileReferenceDownloader { } } - private boolean startDownloadRpc(FileReference fileReference) throws ExecutionException, InterruptedException { + private boolean startDownloadRpc(FileReference fileReference) { Connection connection = connectionPool.getCurrent(); Request request = new Request("filedistribution.serveFile"); request.parameters().add(new StringValue(fileReference.value())); @@ -173,8 +171,12 @@ public class FileReferenceDownloader { } void setDownloadStatus(String file, double completeness) { + setDownloadStatus(new FileReference(file), completeness); + } + + void setDownloadStatus(FileReference fileReference, double completeness) { synchronized (downloads) { - downloadStatus.put(new FileReference(file), completeness); + downloadStatus.put(fileReference, completeness); } } diff --git a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java index 60478550084..d2da020539a 100644 --- a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java +++ b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java @@ -189,7 +189,7 @@ public class FileDownloaderTest { FileReference foo = new FileReference("foo"); FileReference bar = new FileReference("bar"); List<FileReference> fileReferences = Arrays.asList(foo, bar); - fileDownloader.queueForDownload(fileReferences); + fileDownloader.queueForAsyncDownload(fileReferences); // Verify download status assertDownloadStatus(fileDownloader, foo, 0.0); diff --git a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileReceiverTest.java b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileReceiverTest.java index 34ce53ad4c8..762817c27ef 100644 --- a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileReceiverTest.java +++ b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileReceiverTest.java @@ -33,7 +33,6 @@ public class FileReceiverTest { @Test public void receiveMultiPartFile() throws IOException{ - String [] parts = new String[3]; parts[0] = "first part\n"; parts[1] = "second part\n"; @@ -43,16 +42,12 @@ public class FileReceiverTest { sb.append(s); } String all = sb.toString(); - String allRead = transferParts(new FileReference("ref-a"), "myfile-1", all, 1); - assertEquals(all, allRead); - allRead = transferParts(new FileReference("ref-a"), "myfile-2", all, 2); - assertEquals(all, allRead); - allRead = transferParts(new FileReference("ref-a"), "myfile-3", all, 3); - assertEquals(all, allRead); - + transferPartsAndAssert(new FileReference("ref-a"), "myfile-1", all, 1); + transferPartsAndAssert(new FileReference("ref-a"), "myfile-2", all, 2); + transferPartsAndAssert(new FileReference("ref-a"), "myfile-3", all, 3); } - private String transferParts(FileReference ref, String fileName, String all, int numParts) throws IOException { + private void transferPartsAndAssert(FileReference ref, String fileName, String all, int numParts) throws IOException { byte [] allContent = Utf8.toBytes(all); FileReceiver.Session session = new FileReceiver.Session(root, tempDir, 1, ref, @@ -63,12 +58,14 @@ public class FileReceiverTest { byte [] buf = new byte[Math.min(partSize, allContent.length - pos)]; bb.get(buf); session.addPart(i, buf); + // Small numbers, so need a large delta + assertEquals((double)(i+1)/(double)numParts, session.percentageReceived(), 0.04); pos += buf.length; } - File file = session.close(hasher.hash(ByteBuffer.wrap(allContent), 0)); + File file = session.close(hasher.hash(ByteBuffer.wrap(Utf8.toBytes(all)), 0)); byte [] allReadBytes = Files.readAllBytes(file.toPath()); file.delete(); - return Utf8.toString(allReadBytes); + assertEquals(all, Utf8.toString(allReadBytes)); } } diff --git a/filedistribution/src/vespa/filedistribution/model/zkfacade.cpp b/filedistribution/src/vespa/filedistribution/model/zkfacade.cpp index e07f0684584..de410289ec0 100644 --- a/filedistribution/src/vespa/filedistribution/model/zkfacade.cpp +++ b/filedistribution/src/vespa/filedistribution/model/zkfacade.cpp @@ -4,7 +4,7 @@ #include <vespa/vespalib/net/socket_address.h> #include <vespa/filedistribution/common/logfwd.h> #include <vespa/defaults.h> -#include <vespa/vespalib/util/sync.h> +#include <vespa/vespalib/util/gate.h> #include <vespa/vespalib/text/stringtokenizer.h> #include <zookeeper/zookeeper.h> #include <sstream> |