summaryrefslogtreecommitdiffstats
path: root/configserver
diff options
context:
space:
mode:
authorHarald Musum <musum@yahooinc.com>2022-07-06 10:33:57 +0200
committerHarald Musum <musum@yahooinc.com>2022-07-06 10:33:57 +0200
commitf7d93a0f3bade34a4a02cdbd8d3942422ed65b2f (patch)
treea82b30d72ca49a93bf81facd7943dab9ebe3ed8e /configserver
parent8dae227258dde84db5116922fbc616dc1d70d3a7 (diff)
Wire in use of compression types and flags for file distribution
VESPA_FILE_DISTRIBUTION_ACCEPTED_COMPRESSION_TYPES is not set anywhere yet, will be set through host-admin
Diffstat (limited to 'configserver')
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java44
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/ApplicationPackageMaintainer.java20
-rw-r--r--configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java31
3 files changed, 74 insertions, 21 deletions
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java
index f0c34e83713..179d8923378 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java
@@ -21,6 +21,8 @@ import com.yahoo.vespa.filedistribution.FileReferenceData;
import com.yahoo.vespa.filedistribution.FileReferenceDownload;
import com.yahoo.vespa.filedistribution.LazyFileReferenceData;
import com.yahoo.vespa.filedistribution.LazyTemporaryStorageFileReferenceData;
+import com.yahoo.vespa.flags.FlagSource;
+import com.yahoo.vespa.flags.Flags;
import com.yahoo.yolean.Exceptions;
import java.io.File;
import java.io.IOException;
@@ -28,6 +30,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
@@ -35,11 +38,11 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.logging.Level;
import java.util.logging.Logger;
+import java.util.stream.Collectors;
import static com.yahoo.vespa.config.server.filedistribution.FileDistributionUtil.getOtherConfigServersInCluster;
import static com.yahoo.vespa.filedistribution.FileReferenceData.CompressionType;
import static com.yahoo.vespa.filedistribution.FileReferenceData.CompressionType.gzip;
-import static com.yahoo.vespa.filedistribution.FileReferenceData.CompressionType.lz4;
import static com.yahoo.vespa.filedistribution.FileReferenceData.Type.compressed;
public class FileServer {
@@ -52,6 +55,7 @@ public class FileServer {
private final FileDirectory root;
private final ExecutorService executor;
private final FileDownloader downloader;
+ private final List<CompressionType> compressionTypes; // compression types to use, in preferred order
// TODO: Move to filedistribution module, so that it can be used by both clients and servers
private enum FileApiErrorCodes {
@@ -86,21 +90,24 @@ public class FileServer {
@SuppressWarnings("WeakerAccess") // Created by dependency injection
@Inject
- public FileServer(ConfigserverConfig configserverConfig) {
+ public FileServer(ConfigserverConfig configserverConfig, FlagSource flagSource) {
this(new File(Defaults.getDefaults().underVespaHome(configserverConfig.fileReferencesDir())),
- createFileDownloader(getOtherConfigServersInCluster(configserverConfig)));
+ createFileDownloader(getOtherConfigServersInCluster(configserverConfig),
+ compressionTypes(Flags.FILE_DISTRIBUTION_ACCEPTED_COMPRESSION_TYPES.bindTo(flagSource).value())),
+ compressionTypesAsList(Flags.FILE_DISTRIBUTION_COMPRESSION_TYPES_TO_SERVE.bindTo(flagSource).value()));
}
// For testing only
public FileServer(File rootDir) {
- this(rootDir, createFileDownloader(List.of()));
+ this(rootDir, createFileDownloader(List.of(), Set.of(gzip)), List.of(gzip));
}
- public FileServer(File rootDir, FileDownloader fileDownloader) {
+ FileServer(File rootDir, FileDownloader fileDownloader, List<CompressionType> compressionTypes) {
this.downloader = fileDownloader;
this.root = new FileDirectory(rootDir);
this.executor = Executors.newFixedThreadPool(Math.max(8, Runtime.getRuntime().availableProcessors()),
new DaemonThreadFactory("file-server-"));
+ this.compressionTypes = compressionTypes;
}
boolean hasFile(String fileReference) {
@@ -145,6 +152,7 @@ public class FileServer {
if (file.isDirectory()) {
Path tempFile = Files.createTempFile("filereferencedata", reference.value());
CompressionType compressionType = chooseCompressionType(acceptedCompressionTypes);
+ log.log(Level.FINE, () -> "accepted compression types=" + acceptedCompressionTypes + ", compression type to use=" + compressionType);
File compressedFile = new FileReferenceCompressor(compressed, compressionType).compress(file.getParentFile(), tempFile.toFile());
return new LazyTemporaryStorageFileReferenceData(reference, file.getName(), compressed, compressedFile);
} else {
@@ -195,9 +203,14 @@ public class FileServer {
return (fileExists ? FileApiErrorCodes.OK : FileApiErrorCodes.NOT_FOUND);
}
- // TODO: Use lz4 for testing only, add zstd when we have support for (de)compressing zstd input and output streams
+ /* Choose the first compression type (list is in preferred order) that matches an accepted compression type, or fail */
private CompressionType chooseCompressionType(Set<CompressionType> acceptedCompressionTypes) {
- return acceptedCompressionTypes.contains(lz4) ? lz4 : gzip;
+ for (CompressionType compressionType : compressionTypes) {
+ if (acceptedCompressionTypes.contains(compressionType))
+ return compressionType;
+ }
+ throw new RuntimeException("Could not find a compression type that can be used. Accepted compression types: " +
+ acceptedCompressionTypes + ", compression types server can use: " + compressionTypes);
}
boolean hasFileDownloadIfNeeded(FileReferenceDownload fileReferenceDownload) {
@@ -228,14 +241,27 @@ public class FileServer {
executor.shutdown();
}
- private static FileDownloader createFileDownloader(List<String> configServers) {
+ private static FileDownloader createFileDownloader(List<String> configServers, Set<CompressionType> acceptedCompressionTypes) {
Supervisor supervisor = new Supervisor(new Transport("filedistribution-pool")).setDropEmptyBuffers(true);
return new FileDownloader(configServers.isEmpty()
? FileDownloader.emptyConnectionPool()
: createConnectionPool(configServers, supervisor),
supervisor,
- timeout);
+ timeout,
+ acceptedCompressionTypes);
+ }
+
+ private static LinkedHashSet<CompressionType> compressionTypes(List<String> compressionTypes) {
+ return compressionTypes.stream()
+ .map(CompressionType::valueOf)
+ .collect(Collectors.toCollection(LinkedHashSet::new));
+ }
+
+ private static List<CompressionType> compressionTypesAsList(List<String> compressionTypes) {
+ return compressionTypes.stream()
+ .map(CompressionType::valueOf)
+ .collect(Collectors.toList());
}
private static ConnectionPool createConnectionPool(List<String> configServers, Supervisor supervisor) {
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/ApplicationPackageMaintainer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/ApplicationPackageMaintainer.java
index ae4b205c06e..12972e5c465 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/ApplicationPackageMaintainer.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/ApplicationPackageMaintainer.java
@@ -18,20 +18,23 @@ import com.yahoo.vespa.filedistribution.FileDistributionConnectionPool;
import com.yahoo.vespa.filedistribution.FileDownloader;
import com.yahoo.vespa.filedistribution.FileReferenceDownload;
import com.yahoo.vespa.flags.FlagSource;
-
+import com.yahoo.vespa.flags.Flags;
import java.io.File;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
+import java.util.Set;
import java.util.logging.Logger;
+import java.util.stream.Collectors;
import static com.yahoo.vespa.config.server.filedistribution.FileDistributionUtil.fileReferenceExistsOnDisk;
import static com.yahoo.vespa.config.server.filedistribution.FileDistributionUtil.getOtherConfigServersInCluster;
+import static com.yahoo.vespa.filedistribution.FileReferenceData.CompressionType;
/**
* Verifies that all active sessions has an application package on local disk.
* If not, the package is downloaded with file distribution. This can happen e.g.
- * if a configserver is down when the application is deployed.
+ * if a config server is down when the application is deployed.
*
* @author gjoranv
*/
@@ -53,7 +56,10 @@ public class ApplicationPackageMaintainer extends ConfigServerMaintainer {
this.applicationRepository = applicationRepository;
this.configserverConfig = applicationRepository.configserverConfig();
this.downloadDirectory = new File(Defaults.getDefaults().underVespaHome(configserverConfig.fileReferencesDir()));
- this.fileDownloader = createFileDownloader(configserverConfig, downloadDirectory, supervisor);
+ this.fileDownloader = createFileDownloader(configserverConfig,
+ downloadDirectory,
+ supervisor,
+ Flags.FILE_DISTRIBUTION_ACCEPTED_COMPRESSION_TYPES.bindTo(flagSource).value());
}
@Override
@@ -94,14 +100,18 @@ public class ApplicationPackageMaintainer extends ConfigServerMaintainer {
private static FileDownloader createFileDownloader(ConfigserverConfig configserverConfig,
File downloadDirectory,
- Supervisor supervisor) {
+ Supervisor supervisor,
+ List<String> flagValues) {
List<String> otherConfigServersInCluster = getOtherConfigServersInCluster(configserverConfig);
ConfigSourceSet configSourceSet = new ConfigSourceSet(otherConfigServersInCluster);
ConnectionPool connectionPool = (otherConfigServersInCluster.isEmpty())
? FileDownloader.emptyConnectionPool()
: new FileDistributionConnectionPool(configSourceSet, supervisor);
- return new FileDownloader(connectionPool, supervisor, downloadDirectory, Duration.ofSeconds(300));
+ Set<CompressionType> acceptedCompressionTypes = flagValues.stream()
+ .map(CompressionType::valueOf)
+ .collect(Collectors.toSet());
+ return new FileDownloader(connectionPool, supervisor, downloadDirectory, Duration.ofSeconds(300), acceptedCompressionTypes);
}
@Override
diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java
index 9498db1d1e0..39219471bb1 100644
--- a/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java
+++ b/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java
@@ -8,8 +8,10 @@ import com.yahoo.jrt.Supervisor;
import com.yahoo.jrt.Transport;
import com.yahoo.net.HostName;
import com.yahoo.vespa.filedistribution.FileDownloader;
+import com.yahoo.vespa.filedistribution.FileReferenceCompressor;
import com.yahoo.vespa.filedistribution.FileReferenceData;
import com.yahoo.vespa.filedistribution.FileReferenceDownload;
+import com.yahoo.vespa.flags.InMemoryFlagSource;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -39,7 +41,7 @@ public class FileServerTest {
@Before
public void setup() throws IOException {
File rootDir = new File(temporaryFolder.newFolder("fileserver-root").getAbsolutePath());
- fileServer = new FileServer(rootDir, new MockFileDownloader(rootDir));
+ fileServer = new FileServer(rootDir, new MockFileDownloader(rootDir), List.of(gzip, lz4));
}
@Test
@@ -79,11 +81,25 @@ public class FileServerTest {
CompletableFuture<byte []> content = new CompletableFuture<>();
fileServer.startFileServing(new FileReference("12y"), new FileReceiver(content), Set.of(gzip));
assertEquals(new String(content.get()), "dummy-data");
+ }
- IOUtils.writeFile(dir + "/12z/f1", "dummy-data-2", true);
- content = new CompletableFuture<>();
- fileServer.startFileServing(new FileReference("12z"), new FileReceiver(content), Set.of(gzip, lz4));
- assertEquals(new String(content.get()), "dummy-data-2");
+ @Test
+ public void requireThatWeCanReplayDirWithLz4() throws IOException, InterruptedException, ExecutionException {
+ File rootDir = new File(temporaryFolder.newFolder("fileserver-root-3").getAbsolutePath());
+ fileServer = new FileServer(rootDir, new MockFileDownloader(rootDir), List.of(lz4, gzip)); // prefer lz4
+ File dir = getFileServerRootDir();
+ IOUtils.writeFile(dir + "/subdir/12z/f1", "dummy-data-2", true);
+ CompletableFuture<byte []> content = new CompletableFuture<>();
+ fileServer.startFileServing(new FileReference("subdir"), new FileReceiver(content), Set.of(gzip, lz4));
+
+ // Decompress with lz4 and check contents
+ var compressor = new FileReferenceCompressor(FileReferenceData.Type.compressed, lz4);
+ File downloadedFileCompressed = new File(dir + "/downloaded-file-compressed");
+ IOUtils.writeFile(downloadedFileCompressed, content.get());
+ File downloadedFileUncompressed = new File(dir + "/downloaded-file-uncompressed");
+ compressor.decompress(downloadedFileCompressed, downloadedFileUncompressed);
+ assertTrue(downloadedFileUncompressed.isDirectory());
+ assertEquals("dummy-data-2", IOUtils.readFile(new File(downloadedFileUncompressed, "12z/f1")));
}
@Test
@@ -124,7 +140,7 @@ public class FileServerTest {
private FileServer createFileServer(ConfigserverConfig.Builder configBuilder) throws IOException {
File fileReferencesDir = temporaryFolder.newFolder();
configBuilder.fileReferencesDir(fileReferencesDir.getAbsolutePath());
- return new FileServer(new ConfigserverConfig(configBuilder));
+ return new FileServer(new ConfigserverConfig(configBuilder), new InMemoryFlagSource());
}
private static class FileReceiver implements FileServer.Receiver {
@@ -149,7 +165,8 @@ public class FileServerTest {
new Supervisor(new Transport("mock")).setDropEmptyBuffers(true),
downloadDirectory,
Duration.ofMillis(100),
- Duration.ofMillis(100));
+ Duration.ofMillis(100),
+ Set.of(gzip));
}
}