diff options
8 files changed, 104 insertions, 46 deletions
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java index 1edcd3c4d31..14d99cc17af 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java @@ -13,14 +13,18 @@ import com.yahoo.jrt.Supervisor; import com.yahoo.jrt.Transport; import com.yahoo.vespa.config.ConnectionPool; import com.yahoo.vespa.defaults.Defaults; -import com.yahoo.vespa.filedistribution.FileReferenceCompressor; import com.yahoo.vespa.filedistribution.EmptyFileReferenceData; import com.yahoo.vespa.filedistribution.FileDistributionConnectionPool; import com.yahoo.vespa.filedistribution.FileDownloader; +import com.yahoo.vespa.filedistribution.FileReferenceCompressor; import com.yahoo.vespa.filedistribution.FileReferenceData; import com.yahoo.vespa.filedistribution.FileReferenceDownload; import com.yahoo.vespa.filedistribution.LazyFileReferenceData; import com.yahoo.vespa.filedistribution.LazyTemporaryStorageFileReferenceData; +import com.yahoo.vespa.flags.BooleanFlag; +import com.yahoo.vespa.flags.FlagSource; +import com.yahoo.vespa.flags.Flags; +import com.yahoo.vespa.flags.StringFlag; import com.yahoo.yolean.Exceptions; import java.io.File; import java.io.IOException; @@ -36,7 +40,6 @@ import java.util.logging.Level; import java.util.logging.Logger; import static com.yahoo.vespa.config.server.filedistribution.FileDistributionUtil.getOtherConfigServersInCluster; -import static com.yahoo.vespa.filedistribution.FileReferenceData.Type.compressed; public class FileServer { @@ -48,6 +51,8 @@ public class FileServer { private final FileDirectory root; private final ExecutorService executor; private final FileDownloader downloader; + private final StringFlag compressionAlgorithm; + private final BooleanFlag compressSingleFiles; private enum FileApiErrorCodes { OK(0, "OK"), @@ -80,21 +85,24 @@ public class FileServer { @SuppressWarnings("WeakerAccess") // Created by dependency injection @Inject - public FileServer(ConfigserverConfig configserverConfig) { + public FileServer(ConfigserverConfig configserverConfig, FlagSource flagSource) { this(new File(Defaults.getDefaults().underVespaHome(configserverConfig.fileReferencesDir())), - createFileDownloader(getOtherConfigServersInCluster(configserverConfig))); + createFileDownloader(getOtherConfigServersInCluster(configserverConfig)), + flagSource); } // For testing only - public FileServer(File rootDir) { - this(rootDir, createFileDownloader(List.of())); + public FileServer(File rootDir, FlagSource flagSource) { + this(rootDir, createFileDownloader(List.of()), flagSource); } - public FileServer(File rootDir, FileDownloader fileDownloader) { + public FileServer(File rootDir, FileDownloader fileDownloader, FlagSource flagSource) { this.downloader = fileDownloader; this.root = new FileDirectory(rootDir); this.executor = Executors.newFixedThreadPool(Math.max(8, Runtime.getRuntime().availableProcessors()), new DaemonThreadFactory("file-server-")); + this.compressionAlgorithm = Flags.FILE_DISTRIBUTION_COMPRESSION_ALGORITHM.bindTo(flagSource); + this.compressSingleFiles = Flags.FILE_DISTRIBUTION_COMPRESS_SINGLE_FILES.bindTo(flagSource); } boolean hasFile(String fileReference) { @@ -147,17 +155,27 @@ public class FileServer { } private FileReferenceData readFileReferenceData(FileReference reference) throws IOException { + FileReferenceData.Type type = FileReferenceData.from(compressionAlgorithm.value()); File file = root.getFile(reference); - + String fileName = file.getName(); if (file.isDirectory()) { - Path tempFile = Files.createTempFile("filereferencedata", reference.value()); - File compressedFile = new FileReferenceCompressor(compressed).compress(file.getParentFile(), tempFile.toFile()); - return new LazyTemporaryStorageFileReferenceData(reference, file.getName(), compressed, compressedFile); + return createFileReferenceData(file.getParentFile(), reference, type, fileName); + } else if (compressSingleFiles.value()) { + return createFileReferenceData(file, reference, type, fileName); } else { - return new LazyFileReferenceData(reference, file.getName(), FileReferenceData.Type.file, file); + return new LazyFileReferenceData(reference, fileName, FileReferenceData.Type.file, file); } } + LazyTemporaryStorageFileReferenceData createFileReferenceData(File file, + FileReference reference, + FileReferenceData.Type type, + String fileName) throws IOException { + Path tempFile = Files.createTempFile("filereferencedata", reference.value()); + File compressedFile = new FileReferenceCompressor(type).compress(file.getParentFile(), tempFile.toFile()); + return new LazyTemporaryStorageFileReferenceData(reference, fileName, type, compressedFile); + } + public void serveFile(String fileReference, boolean downloadFromOtherSourceIfNotFound, Request request, Receiver receiver) { if (executor instanceof ThreadPoolExecutor) log.log(Level.FINE, () -> "Active threads: " + ((ThreadPoolExecutor) executor).getActiveCount()); diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java index 67c40f94b6a..216a81af0ab 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java @@ -10,6 +10,7 @@ import com.yahoo.net.HostName; import com.yahoo.vespa.filedistribution.FileDownloader; import com.yahoo.vespa.filedistribution.FileReferenceData; import com.yahoo.vespa.filedistribution.FileReferenceDownload; +import com.yahoo.vespa.flags.InMemoryFlagSource; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -37,7 +38,7 @@ public class FileServerTest { @Before public void setup() throws IOException { File rootDir = new File(temporaryFolder.newFolder("fileserver-root").getAbsolutePath()); - fileServer = new FileServer(rootDir, new MockFileDownloader(rootDir)); + fileServer = new FileServer(rootDir, new MockFileDownloader(rootDir), new InMemoryFlagSource()); } @Test @@ -117,7 +118,7 @@ public class FileServerTest { private FileServer createFileServer(ConfigserverConfig.Builder configBuilder) throws IOException { File fileReferencesDir = temporaryFolder.newFolder(); configBuilder.fileReferencesDir(fileReferencesDir.getAbsolutePath()); - return new FileServer(new ConfigserverConfig(configBuilder)); + return new FileServer(new ConfigserverConfig(configBuilder), new InMemoryFlagSource()); } private static class FileReceiver implements FileServer.Receiver { diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/MockRpcServer.java b/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/MockRpcServer.java index 3272689473e..77f67604aad 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/MockRpcServer.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/MockRpcServer.java @@ -11,6 +11,7 @@ import com.yahoo.vespa.config.server.host.ConfigRequestHostLivenessTracker; import com.yahoo.vespa.config.server.host.HostRegistry; import com.yahoo.vespa.config.server.monitoring.Metrics; import com.yahoo.vespa.config.server.rpc.security.NoopRpcAuthorizer; +import com.yahoo.vespa.flags.InMemoryFlagSource; import java.io.File; import java.time.Duration; @@ -39,7 +40,7 @@ public class MockRpcServer extends RpcServer { Metrics.createTestMetrics(), new HostRegistry(), new ConfigRequestHostLivenessTracker(), - new FileServer(tempDir), + new FileServer(tempDir, new InMemoryFlagSource()), new NoopRpcAuthorizer(), new RpcRequestHandlerProvider()); } diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/RpcTester.java b/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/RpcTester.java index 40ed20b7969..73312710c45 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/RpcTester.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/RpcTester.java @@ -112,20 +112,21 @@ public class RpcTester implements AutoCloseable { } RpcServer createRpcServer(ConfigserverConfig config) throws IOException { + InMemoryFlagSource flagSource = new InMemoryFlagSource(); RpcServer rpcServer = new RpcServer(config, - new SuperModelRequestHandler(new TestConfigDefinitionRepo(), + new SuperModelRequestHandler(new TestConfigDefinitionRepo(), configserverConfig, new SuperModelManager( config, Zone.defaultZone(), new MemoryGenerationCounter(), - new InMemoryFlagSource())), - Metrics.createTestMetrics(), - hostRegistry, - hostLivenessTracker, - new FileServer(temporaryFolder.newFolder()), - new NoopRpcAuthorizer(), - new RpcRequestHandlerProvider()); + flagSource)), + Metrics.createTestMetrics(), + hostRegistry, + hostLivenessTracker, + new FileServer(temporaryFolder.newFolder(), flagSource), + new NoopRpcAuthorizer(), + new RpcRequestHandlerProvider()); rpcServer.setUpGetConfigHandlers(); return rpcServer; } diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java index 81a8944149c..da52caefb7d 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReceiver.java @@ -22,8 +22,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; import java.util.logging.Logger; -import static com.yahoo.vespa.filedistribution.FileReferenceData.Type; - /** * When asking for a file reference, this handles RPC callbacks from config server with file data and metadata. * Uses the same Supervisor as the original caller that requests files, so communication uses the same @@ -109,32 +107,40 @@ public class FileReceiver { verifyHash(hash); File file = new File(fileReferenceDir, fileName); - File decompressedDir = null; try { - if (fileType == Type.file) { - try { + switch (fileType) { + case file: + log.log(Level.FINE, () -> "Uncompressed file reference " + fileName + ", storing as " + file.getAbsolutePath()); Files.createDirectories(fileReferenceDir.toPath()); - } catch (IOException e) { - log.log(Level.SEVERE, "Failed creating directory (" + fileReferenceDir.toPath() + "): " + e.getMessage(), e); - throw new RuntimeException("Failed creating directory (" + fileReferenceDir.toPath() + "): ", e); - } - log.log(Level.FINE, () -> "Uncompressed file, moving to " + file.getAbsolutePath()); - moveFileToDestination(inprogressFile, file); - } else { - decompressedDir = Files.createTempDirectory(tmpDir.toPath(), "archive").toFile(); - new FileReferenceCompressor(fileType).decompress(inprogressFile, decompressedDir); - moveFileToDestination(decompressedDir, fileReferenceDir); + moveFileToDestination(inprogressFile, file); + break; + case compressed: + log.log(Level.FINE, () -> "Compressed file reference (directory)" + fileName + ", storing in " + fileReferenceDir.getAbsolutePath()); + decompress(fileType, fileReferenceDir, inprogressFile); + break; + default: + throw new RuntimeException("Unknown file type " + fileType); } } catch (IOException e) { log.log(Level.SEVERE, "Failed writing file: " + e.getMessage(), e); throw new RuntimeException("Failed writing file: ", e); } finally { deletePath(inprogressFile); - deletePath(decompressedDir); } return file; } + void decompress(FileReferenceData.Type fileType, File fileReferenceDir, File file) throws IOException { + File decompressedDir = null; + try { + decompressedDir = Files.createTempDirectory(tmpDir.toPath(), "archive").toFile(); + new FileReferenceCompressor(fileType).decompress(file, decompressedDir); + moveFileToDestination(decompressedDir, fileReferenceDir); + } finally { + deletePath(decompressedDir); + } + } + double percentageReceived() { return (double)currentFileSize/(double)fileSize; } diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceCompressor.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceCompressor.java index c36bcd22606..d0957297f79 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceCompressor.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceCompressor.java @@ -47,13 +47,16 @@ public class FileReferenceCompressor { return outputFile; } - public File compress(File directory, File outputFile) throws IOException { - return compress(directory, - Files.find(Paths.get(directory.getAbsolutePath()), - recurseDepth, - (p, basicFileAttributes) -> basicFileAttributes.isRegularFile()) - .map(Path::toFile).collect(Collectors.toList()), - outputFile); + public File compress(File file, File outputFile) throws IOException { + if (file.isDirectory()) + return compress(file, + Files.find(Paths.get(file.getAbsolutePath()), + recurseDepth, + (p, basicFileAttributes) -> basicFileAttributes.isRegularFile()) + .map(Path::toFile).collect(Collectors.toList()), + outputFile); + else + return compress(file.getParentFile(), List.of(file), outputFile); } public void decompress(File inputFile, File outputDir) throws IOException { diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceData.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceData.java index 03f8d184f94..f1ac8edaabb 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceData.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceData.java @@ -64,6 +64,17 @@ public abstract class FileReferenceData { */ public abstract void close(); + public static Type from(String type) { + switch (type) { + case "none": + return Type.file; + case "gzip": + return Type.compressed; + default: + throw new RuntimeException("Unknown compression type " + type); + } + } + @Override public String toString() { return fileReference.value() + " (" + filename + "), " + type.name(); } diff --git a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileReceiverTest.java b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileReceiverTest.java index 5c15f945ae3..4996a439cfe 100644 --- a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileReceiverTest.java +++ b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileReceiverTest.java @@ -15,6 +15,8 @@ import java.io.FileWriter; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.file.Files; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import static com.yahoo.vespa.filedistribution.FileReferenceData.Type.compressed; import static org.junit.Assert.assertEquals; @@ -65,6 +67,21 @@ public class FileReceiverTest { assertEquals("2", IOUtils.readFile(new File(downloadDir, "b"))); } + @Test + public void receiveCompressedSingleFile() throws IOException{ + File inputFile = temporaryFolder.newFile("a-file"); + FileWriter writer = new FileWriter(inputFile); + String content = IntStream.range(1, 1000).mapToObj(a -> "surely this can be compressed").collect(Collectors.joining(",")); + writer.write(content); + writer.close(); + + File tempFile = temporaryFolder.newFile(); + File file = new FileReferenceCompressor(compressed).compress(inputFile, tempFile); + transferCompressedData(new FileReference("ref"), "a-file", IOUtils.readFileBytes(file)); + File downloadDir = new File(root, "ref"); + assertEquals(content, IOUtils.readFile(new File(downloadDir, "a-file"))); + } + private void transferPartsAndAssert(FileReference ref, String fileName, String all, int numParts) throws IOException { byte [] allContent = Utf8.toBytes(all); |