diff options
author | Harald Musum <musum@yahooinc.com> | 2022-07-06 10:33:57 +0200 |
---|---|---|
committer | Harald Musum <musum@yahooinc.com> | 2022-07-06 10:33:57 +0200 |
commit | f7d93a0f3bade34a4a02cdbd8d3942422ed65b2f (patch) | |
tree | a82b30d72ca49a93bf81facd7943dab9ebe3ed8e /configserver | |
parent | 8dae227258dde84db5116922fbc616dc1d70d3a7 (diff) |
Wire in use of compression types and flags for file distribution
VESPA_FILE_DISTRIBUTION_ACCEPTED_COMPRESSION_TYPES is not set anywhere
yet, will be set through host-admin
Diffstat (limited to 'configserver')
3 files changed, 74 insertions, 21 deletions
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java index f0c34e83713..179d8923378 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java @@ -21,6 +21,8 @@ import com.yahoo.vespa.filedistribution.FileReferenceData; import com.yahoo.vespa.filedistribution.FileReferenceDownload; import com.yahoo.vespa.filedistribution.LazyFileReferenceData; import com.yahoo.vespa.filedistribution.LazyTemporaryStorageFileReferenceData; +import com.yahoo.vespa.flags.FlagSource; +import com.yahoo.vespa.flags.Flags; import com.yahoo.yolean.Exceptions; import java.io.File; import java.io.IOException; @@ -28,6 +30,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.time.Duration; import java.time.Instant; +import java.util.LinkedHashSet; import java.util.List; import java.util.Set; import java.util.concurrent.ExecutorService; @@ -35,11 +38,11 @@ import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; import java.util.logging.Level; import java.util.logging.Logger; +import java.util.stream.Collectors; import static com.yahoo.vespa.config.server.filedistribution.FileDistributionUtil.getOtherConfigServersInCluster; import static com.yahoo.vespa.filedistribution.FileReferenceData.CompressionType; import static com.yahoo.vespa.filedistribution.FileReferenceData.CompressionType.gzip; -import static com.yahoo.vespa.filedistribution.FileReferenceData.CompressionType.lz4; import static com.yahoo.vespa.filedistribution.FileReferenceData.Type.compressed; public class FileServer { @@ -52,6 +55,7 @@ public class FileServer { private final FileDirectory root; private final ExecutorService executor; private final FileDownloader downloader; + private final List<CompressionType> compressionTypes; // compression types to use, in preferred order // TODO: Move to filedistribution module, so that it can be used by both clients and servers private enum FileApiErrorCodes { @@ -86,21 +90,24 @@ public class FileServer { @SuppressWarnings("WeakerAccess") // Created by dependency injection @Inject - public FileServer(ConfigserverConfig configserverConfig) { + public FileServer(ConfigserverConfig configserverConfig, FlagSource flagSource) { this(new File(Defaults.getDefaults().underVespaHome(configserverConfig.fileReferencesDir())), - createFileDownloader(getOtherConfigServersInCluster(configserverConfig))); + createFileDownloader(getOtherConfigServersInCluster(configserverConfig), + compressionTypes(Flags.FILE_DISTRIBUTION_ACCEPTED_COMPRESSION_TYPES.bindTo(flagSource).value())), + compressionTypesAsList(Flags.FILE_DISTRIBUTION_COMPRESSION_TYPES_TO_SERVE.bindTo(flagSource).value())); } // For testing only public FileServer(File rootDir) { - this(rootDir, createFileDownloader(List.of())); + this(rootDir, createFileDownloader(List.of(), Set.of(gzip)), List.of(gzip)); } - public FileServer(File rootDir, FileDownloader fileDownloader) { + FileServer(File rootDir, FileDownloader fileDownloader, List<CompressionType> compressionTypes) { this.downloader = fileDownloader; this.root = new FileDirectory(rootDir); this.executor = Executors.newFixedThreadPool(Math.max(8, Runtime.getRuntime().availableProcessors()), new DaemonThreadFactory("file-server-")); + this.compressionTypes = compressionTypes; } boolean hasFile(String fileReference) { @@ -145,6 +152,7 @@ public class FileServer { if (file.isDirectory()) { Path tempFile = Files.createTempFile("filereferencedata", reference.value()); CompressionType compressionType = chooseCompressionType(acceptedCompressionTypes); + log.log(Level.FINE, () -> "accepted compression types=" + acceptedCompressionTypes + ", compression type to use=" + compressionType); File compressedFile = new FileReferenceCompressor(compressed, compressionType).compress(file.getParentFile(), tempFile.toFile()); return new LazyTemporaryStorageFileReferenceData(reference, file.getName(), compressed, compressedFile); } else { @@ -195,9 +203,14 @@ public class FileServer { return (fileExists ? FileApiErrorCodes.OK : FileApiErrorCodes.NOT_FOUND); } - // TODO: Use lz4 for testing only, add zstd when we have support for (de)compressing zstd input and output streams + /* Choose the first compression type (list is in preferred order) that matches an accepted compression type, or fail */ private CompressionType chooseCompressionType(Set<CompressionType> acceptedCompressionTypes) { - return acceptedCompressionTypes.contains(lz4) ? lz4 : gzip; + for (CompressionType compressionType : compressionTypes) { + if (acceptedCompressionTypes.contains(compressionType)) + return compressionType; + } + throw new RuntimeException("Could not find a compression type that can be used. Accepted compression types: " + + acceptedCompressionTypes + ", compression types server can use: " + compressionTypes); } boolean hasFileDownloadIfNeeded(FileReferenceDownload fileReferenceDownload) { @@ -228,14 +241,27 @@ public class FileServer { executor.shutdown(); } - private static FileDownloader createFileDownloader(List<String> configServers) { + private static FileDownloader createFileDownloader(List<String> configServers, Set<CompressionType> acceptedCompressionTypes) { Supervisor supervisor = new Supervisor(new Transport("filedistribution-pool")).setDropEmptyBuffers(true); return new FileDownloader(configServers.isEmpty() ? FileDownloader.emptyConnectionPool() : createConnectionPool(configServers, supervisor), supervisor, - timeout); + timeout, + acceptedCompressionTypes); + } + + private static LinkedHashSet<CompressionType> compressionTypes(List<String> compressionTypes) { + return compressionTypes.stream() + .map(CompressionType::valueOf) + .collect(Collectors.toCollection(LinkedHashSet::new)); + } + + private static List<CompressionType> compressionTypesAsList(List<String> compressionTypes) { + return compressionTypes.stream() + .map(CompressionType::valueOf) + .collect(Collectors.toList()); } private static ConnectionPool createConnectionPool(List<String> configServers, Supervisor supervisor) { diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/ApplicationPackageMaintainer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/ApplicationPackageMaintainer.java index ae4b205c06e..12972e5c465 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/ApplicationPackageMaintainer.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/ApplicationPackageMaintainer.java @@ -18,20 +18,23 @@ import com.yahoo.vespa.filedistribution.FileDistributionConnectionPool; import com.yahoo.vespa.filedistribution.FileDownloader; import com.yahoo.vespa.filedistribution.FileReferenceDownload; import com.yahoo.vespa.flags.FlagSource; - +import com.yahoo.vespa.flags.Flags; import java.io.File; import java.time.Duration; import java.util.List; import java.util.Optional; +import java.util.Set; import java.util.logging.Logger; +import java.util.stream.Collectors; import static com.yahoo.vespa.config.server.filedistribution.FileDistributionUtil.fileReferenceExistsOnDisk; import static com.yahoo.vespa.config.server.filedistribution.FileDistributionUtil.getOtherConfigServersInCluster; +import static com.yahoo.vespa.filedistribution.FileReferenceData.CompressionType; /** * Verifies that all active sessions has an application package on local disk. * If not, the package is downloaded with file distribution. This can happen e.g. - * if a configserver is down when the application is deployed. + * if a config server is down when the application is deployed. * * @author gjoranv */ @@ -53,7 +56,10 @@ public class ApplicationPackageMaintainer extends ConfigServerMaintainer { this.applicationRepository = applicationRepository; this.configserverConfig = applicationRepository.configserverConfig(); this.downloadDirectory = new File(Defaults.getDefaults().underVespaHome(configserverConfig.fileReferencesDir())); - this.fileDownloader = createFileDownloader(configserverConfig, downloadDirectory, supervisor); + this.fileDownloader = createFileDownloader(configserverConfig, + downloadDirectory, + supervisor, + Flags.FILE_DISTRIBUTION_ACCEPTED_COMPRESSION_TYPES.bindTo(flagSource).value()); } @Override @@ -94,14 +100,18 @@ public class ApplicationPackageMaintainer extends ConfigServerMaintainer { private static FileDownloader createFileDownloader(ConfigserverConfig configserverConfig, File downloadDirectory, - Supervisor supervisor) { + Supervisor supervisor, + List<String> flagValues) { List<String> otherConfigServersInCluster = getOtherConfigServersInCluster(configserverConfig); ConfigSourceSet configSourceSet = new ConfigSourceSet(otherConfigServersInCluster); ConnectionPool connectionPool = (otherConfigServersInCluster.isEmpty()) ? FileDownloader.emptyConnectionPool() : new FileDistributionConnectionPool(configSourceSet, supervisor); - return new FileDownloader(connectionPool, supervisor, downloadDirectory, Duration.ofSeconds(300)); + Set<CompressionType> acceptedCompressionTypes = flagValues.stream() + .map(CompressionType::valueOf) + .collect(Collectors.toSet()); + return new FileDownloader(connectionPool, supervisor, downloadDirectory, Duration.ofSeconds(300), acceptedCompressionTypes); } @Override diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java index 9498db1d1e0..39219471bb1 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java @@ -8,8 +8,10 @@ import com.yahoo.jrt.Supervisor; import com.yahoo.jrt.Transport; import com.yahoo.net.HostName; import com.yahoo.vespa.filedistribution.FileDownloader; +import com.yahoo.vespa.filedistribution.FileReferenceCompressor; import com.yahoo.vespa.filedistribution.FileReferenceData; import com.yahoo.vespa.filedistribution.FileReferenceDownload; +import com.yahoo.vespa.flags.InMemoryFlagSource; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -39,7 +41,7 @@ public class FileServerTest { @Before public void setup() throws IOException { File rootDir = new File(temporaryFolder.newFolder("fileserver-root").getAbsolutePath()); - fileServer = new FileServer(rootDir, new MockFileDownloader(rootDir)); + fileServer = new FileServer(rootDir, new MockFileDownloader(rootDir), List.of(gzip, lz4)); } @Test @@ -79,11 +81,25 @@ public class FileServerTest { CompletableFuture<byte []> content = new CompletableFuture<>(); fileServer.startFileServing(new FileReference("12y"), new FileReceiver(content), Set.of(gzip)); assertEquals(new String(content.get()), "dummy-data"); + } - IOUtils.writeFile(dir + "/12z/f1", "dummy-data-2", true); - content = new CompletableFuture<>(); - fileServer.startFileServing(new FileReference("12z"), new FileReceiver(content), Set.of(gzip, lz4)); - assertEquals(new String(content.get()), "dummy-data-2"); + @Test + public void requireThatWeCanReplayDirWithLz4() throws IOException, InterruptedException, ExecutionException { + File rootDir = new File(temporaryFolder.newFolder("fileserver-root-3").getAbsolutePath()); + fileServer = new FileServer(rootDir, new MockFileDownloader(rootDir), List.of(lz4, gzip)); // prefer lz4 + File dir = getFileServerRootDir(); + IOUtils.writeFile(dir + "/subdir/12z/f1", "dummy-data-2", true); + CompletableFuture<byte []> content = new CompletableFuture<>(); + fileServer.startFileServing(new FileReference("subdir"), new FileReceiver(content), Set.of(gzip, lz4)); + + // Decompress with lz4 and check contents + var compressor = new FileReferenceCompressor(FileReferenceData.Type.compressed, lz4); + File downloadedFileCompressed = new File(dir + "/downloaded-file-compressed"); + IOUtils.writeFile(downloadedFileCompressed, content.get()); + File downloadedFileUncompressed = new File(dir + "/downloaded-file-uncompressed"); + compressor.decompress(downloadedFileCompressed, downloadedFileUncompressed); + assertTrue(downloadedFileUncompressed.isDirectory()); + assertEquals("dummy-data-2", IOUtils.readFile(new File(downloadedFileUncompressed, "12z/f1"))); } @Test @@ -124,7 +140,7 @@ public class FileServerTest { private FileServer createFileServer(ConfigserverConfig.Builder configBuilder) throws IOException { File fileReferencesDir = temporaryFolder.newFolder(); configBuilder.fileReferencesDir(fileReferencesDir.getAbsolutePath()); - return new FileServer(new ConfigserverConfig(configBuilder)); + return new FileServer(new ConfigserverConfig(configBuilder), new InMemoryFlagSource()); } private static class FileReceiver implements FileServer.Receiver { @@ -149,7 +165,8 @@ public class FileServerTest { new Supervisor(new Transport("mock")).setDropEmptyBuffers(true), downloadDirectory, Duration.ofMillis(100), - Duration.ofMillis(100)); + Duration.ofMillis(100), + Set.of(gzip)); } } |