diff options
author | Harald Musum <musum@oath.com> | 2018-04-03 07:52:51 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-04-03 07:52:51 +0200 |
commit | 275e3c9d402ac1bc4b02eb2b20f8554cb1b92daf (patch) | |
tree | 2cd1f855371b391fb5cdeee9035b5012c1eded0e /filedistribution/src | |
parent | 0337ed04d70c616c3da66c3b7d0df628afefb483 (diff) |
Revert "Remove method only used in tests"
Diffstat (limited to 'filedistribution/src')
5 files changed, 56 insertions, 15 deletions
diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java index 69559a22919..f2a82ac6ead 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java @@ -35,14 +35,15 @@ public class FileDownloader { public FileDownloader(ConnectionPool connectionPool) { this(connectionPool, new File(Defaults.getDefaults().underVespaHome("var/db/vespa/filedistribution")), + new File(Defaults.getDefaults().underVespaHome("var/db/vespa/filedistribution")), Duration.ofMinutes(15), Duration.ofSeconds(10)); } - FileDownloader(ConnectionPool connectionPool, File downloadDirectory, Duration timeout, Duration sleepBetweenRetries) { + FileDownloader(ConnectionPool connectionPool, File downloadDirectory, File tmpDirectory, Duration timeout, Duration sleepBetweenRetries) { this.downloadDirectory = downloadDirectory; this.timeout = timeout; - this.fileReferenceDownloader = new FileReferenceDownloader(connectionPool, timeout, sleepBetweenRetries); + this.fileReferenceDownloader = new FileReferenceDownloader(downloadDirectory, tmpDirectory, connectionPool, timeout, sleepBetweenRetries); } public Optional<File> getFile(FileReference fileReference) { @@ -77,6 +78,10 @@ public class FileDownloader { } } + void receiveFile(FileReferenceData fileReferenceData) { + fileReferenceDownloader.receiveFile(fileReferenceData); + } + double downloadStatus(FileReference fileReference) { return fileReferenceDownloader.downloadStatus(fileReference.value()); } diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java index 26a1cad2220..83cf4e1ad80 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java @@ -185,6 +185,38 @@ public class FileReceiver { return methods; } + void receiveFile(FileReferenceData fileReferenceData) { + long xxHashFromContent = fileReferenceData.xxhash(); + if (xxHashFromContent != fileReferenceData.xxhash()) { + throw new RuntimeException("xxhash from content (" + xxHashFromContent + ") is not equal to xxhash in request (" + fileReferenceData.xxhash() + ")"); + } + + File fileReferenceDir = new File(downloadDirectory, fileReferenceData.fileReference().value()); + // file might be a directory (and then type is compressed) + File file = new File(fileReferenceDir, fileReferenceData.filename()); + try { + File tempDownloadedDir = Files.createTempDirectory(tmpDirectory.toPath(), "downloaded").toFile(); + File tempFile = new File(tempDownloadedDir, fileReferenceData.filename()); + Files.write(tempFile.toPath(), fileReferenceData.content().array()); + + // Unpack if necessary + if (fileReferenceData.type() == FileReferenceData.Type.compressed) { + File decompressedDir = Files.createTempDirectory(tempDownloadedDir.toPath(), "decompressed").toFile(); + log.log(LogLevel.DEBUG, () -> "Compressed file, unpacking " + tempFile + " to " + decompressedDir); + CompressedFileReference.decompress(tempFile, decompressedDir); + moveFileToDestination(decompressedDir, fileReferenceDir); + } else { + log.log(LogLevel.DEBUG, () -> "Uncompressed file, moving to " + file.getAbsolutePath()); + Files.createDirectories(fileReferenceDir.toPath()); + moveFileToDestination(tempFile, file); + } + downloader.completedDownloading(fileReferenceData.fileReference(), file); + } catch (IOException e) { + log.log(LogLevel.ERROR, "Failed writing file: " + e.getMessage(), e); + throw new RuntimeException("Failed writing file: ", e); + } + } + private static void moveFileToDestination(File tempFile, File destination) { try { Files.move(tempFile.toPath(), destination.toPath()); diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java index 2c41225495d..1008df229f1 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java @@ -43,11 +43,14 @@ public class FileReferenceDownloader { private final Map<FileReference, Double> downloadStatus = new HashMap<>(); // between 0 and 1 private final Duration downloadTimeout; private final Duration sleepBetweenRetries; + private final FileReceiver fileReceiver; - FileReferenceDownloader(ConnectionPool connectionPool, Duration timeout, Duration sleepBetweenRetries) { + FileReferenceDownloader(File downloadDirectory, File tmpDirectory, ConnectionPool connectionPool, + Duration timeout, Duration sleepBetweenRetries) { this.connectionPool = connectionPool; this.downloadTimeout = timeout; this.sleepBetweenRetries = sleepBetweenRetries; + this.fileReceiver = new FileReceiver(connectionPool.getSupervisor(), this, downloadDirectory, tmpDirectory); } private void startDownload(Duration timeout, FileReferenceDownload fileReferenceDownload) { @@ -83,6 +86,10 @@ public class FileReferenceDownloader { downloadExecutor.submit(() -> startDownload(downloadTimeout, fileReferenceDownload)); } + void receiveFile(FileReferenceData fileReferenceData) { + fileReceiver.receiveFile(fileReferenceData); + } + void completedDownloading(FileReference fileReference, File file) { synchronized (downloads) { FileReferenceDownload download = downloads.get(fileReference); diff --git a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java index 123fa51cdab..e0ef2ecf7e4 100644 --- a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java +++ b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java @@ -13,14 +13,11 @@ import com.yahoo.jrt.Transport; import com.yahoo.text.Utf8; import com.yahoo.vespa.config.Connection; import com.yahoo.vespa.config.ConnectionPool; -import net.jpountz.xxhash.XXHash64; -import net.jpountz.xxhash.XXHashFactory; import org.junit.Before; import org.junit.Test; import java.io.File; import java.io.IOException; -import java.nio.ByteBuffer; import java.nio.file.Files; import java.nio.file.Path; import java.time.Duration; @@ -46,7 +43,7 @@ public class FileDownloaderTest { downloadDir = Files.createTempDirectory("filedistribution").toFile(); tempDir = Files.createTempDirectory("download").toFile(); connection = new MockConnection(); - fileDownloader = new FileDownloader(connection, downloadDir, Duration.ofSeconds(2), Duration.ofMillis(100)); + fileDownloader = new FileDownloader(connection, downloadDir, tempDir, Duration.ofSeconds(2), Duration.ofMillis(100)); } catch (IOException e) { e.printStackTrace(); fail(e.getMessage()); @@ -152,7 +149,7 @@ public class FileDownloaderTest { @Test public void getFileWhenConnectionError() throws IOException { - fileDownloader = new FileDownloader(connection, downloadDir, Duration.ofSeconds(3), Duration.ofMillis(100)); + fileDownloader = new FileDownloader(connection, downloadDir, tempDir, Duration.ofSeconds(3), Duration.ofMillis(100)); File downloadDir = fileDownloader.downloadDirectory(); int timesToFail = 2; @@ -189,7 +186,7 @@ public class FileDownloaderTest { File downloadDir = Files.createTempDirectory("filedistribution").toFile(); MockConnection connectionPool = new MockConnection(); connectionPool.setResponseHandler(new MockConnection.WaitResponseHandler(timeout.plus(Duration.ofMillis(1000)))); - FileDownloader fileDownloader = new FileDownloader(connectionPool, downloadDir, timeout, sleepBetweenRetries); + FileDownloader fileDownloader = new FileDownloader(connectionPool, downloadDir, tempDir, timeout, sleepBetweenRetries); FileReference foo = new FileReference("foo"); FileReference bar = new FileReference("bar"); fileDownloader.queueForAsyncDownload(new FileReferenceDownload(foo)); @@ -228,11 +225,7 @@ public class FileDownloaderTest { } private void receiveFile(FileReference fileReference, String filename, FileReferenceData.Type type, byte[] content) { - XXHash64 hasher = XXHashFactory.fastestInstance().hash64(); - FileReceiver.Session session = - new FileReceiver.Session(downloadDir, tempDir, 1, fileReference, type, filename, content.length); - session.addPart(0, content); - session.close(hasher.hash(ByteBuffer.wrap(content), 0)); + fileDownloader.receiveFile(new FileReferenceDataBlob(fileReference, filename, type, content)); } private static class MockConnection implements ConnectionPool, com.yahoo.vespa.config.Connection { diff --git a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileReceiverTest.java b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileReceiverTest.java index 8dda0bcce66..78fc094a9ef 100644 --- a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileReceiverTest.java +++ b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileReceiverTest.java @@ -14,10 +14,14 @@ import org.junit.rules.TemporaryFolder; import static org.junit.Assert.assertEquals; import java.io.File; +import java.io.FileOutputStream; +import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; public class FileReceiverTest { private File root; @@ -88,7 +92,7 @@ public class FileReceiverTest { assertEquals(all, Utf8.toString(allReadBytes)); } - private void transferCompressedData(FileReference ref, String fileName, byte[] data) { + private void transferCompressedData(FileReference ref, String fileName, byte[] data) throws IOException { FileReceiver.Session session = new FileReceiver.Session(root, tempDir, 1, ref, FileReferenceData.Type.compressed, fileName, data.length); session.addPart(0, data); |