diff options
author | Arne H Juul <arnej@yahooinc.com> | 2022-06-14 10:42:48 +0000 |
---|---|---|
committer | Arne H Juul <arnej@yahooinc.com> | 2022-06-14 10:42:48 +0000 |
commit | bbad13af86e9606147b0c5669203f605b220be17 (patch) | |
tree | 4bb8180abc3d78a7fbe704b9aea0b5afb6b9ec32 /logserver/src/main | |
parent | 48d5dde7e5c7b3b17ef2a31e5075de7031f8cf05 (diff) |
Reapply "hack in zstd compression"
This reverts commit ffc98bcd55d44f96a896e38507f7f7174989b758.
Diffstat (limited to 'logserver/src/main')
3 files changed, 115 insertions, 7 deletions
diff --git a/logserver/src/main/java/com/yahoo/logserver/handlers/archive/ArchiverHandler.java b/logserver/src/main/java/com/yahoo/logserver/handlers/archive/ArchiverHandler.java index 0b44e47f183..50df160d01f 100644 --- a/logserver/src/main/java/com/yahoo/logserver/handlers/archive/ArchiverHandler.java +++ b/logserver/src/main/java/com/yahoo/logserver/handlers/archive/ArchiverHandler.java @@ -86,9 +86,9 @@ public class ArchiverHandler extends AbstractLogHandler { * Creates an ArchiverHandler which puts files under * the given root directory. */ - public ArchiverHandler(String rootDir, int maxFileSize) { + public ArchiverHandler(String rootDir, int maxFileSize, String zip) { this(); - setRootDir(rootDir); + setRootDir(rootDir, zip); this.maxFileSize = maxFileSize; } @@ -189,7 +189,7 @@ public class ArchiverHandler extends AbstractLogHandler { } } - private void setRootDir(String rootDir) { + private void setRootDir(String rootDir, String zip) { // roundabout way of setting things, but this way we can // get around Java's ineptitude for file handling (relative paths in File are broken) absoluteRootDir = new File(rootDir).getAbsolutePath(); @@ -205,7 +205,7 @@ public class ArchiverHandler extends AbstractLogHandler { log.log(Level.FINE, () -> "Created root at " + absoluteRootDir); } } - filesArchived = new FilesArchived(root); + filesArchived = new FilesArchived(root, zip); } public String toString() { diff --git a/logserver/src/main/java/com/yahoo/logserver/handlers/archive/ArchiverPlugin.java b/logserver/src/main/java/com/yahoo/logserver/handlers/archive/ArchiverPlugin.java index deb3b1adcf4..afbd12ab05f 100644 --- a/logserver/src/main/java/com/yahoo/logserver/handlers/archive/ArchiverPlugin.java +++ b/logserver/src/main/java/com/yahoo/logserver/handlers/archive/ArchiverPlugin.java @@ -20,6 +20,8 @@ public class ArchiverPlugin implements Plugin { */ private static final String DEFAULT_MAXFILESIZE = "20971520"; + private static final String DEFAULT_COMPRESSION = "zstd"; + private final Server server = Server.getInstance(); private static final Logger log = Logger.getLogger(ArchiverPlugin.class.getName()); private ArchiverHandler archiver; @@ -52,9 +54,10 @@ public class ArchiverPlugin implements Plugin { String rootDir = config.get("dir", DEFAULT_DIR); int maxFileSize = config.getInt("maxfilesize", DEFAULT_MAXFILESIZE); String threadName = config.get("thread", getPluginName()); + String zip = config.get("compression", DEFAULT_COMPRESSION); // register log handler and flusher - archiver = new ArchiverHandler(rootDir, maxFileSize); + archiver = new ArchiverHandler(rootDir, maxFileSize, zip); server.registerLogHandler(archiver, threadName); server.registerFlusher(archiver); } diff --git a/logserver/src/main/java/com/yahoo/logserver/handlers/archive/FilesArchived.java b/logserver/src/main/java/com/yahoo/logserver/handlers/archive/FilesArchived.java index 54e47e15d8e..d1e9793ffaf 100644 --- a/logserver/src/main/java/com/yahoo/logserver/handlers/archive/FilesArchived.java +++ b/logserver/src/main/java/com/yahoo/logserver/handlers/archive/FilesArchived.java @@ -8,10 +8,26 @@ import java.io.FileOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.logging.Level; import java.util.logging.Logger; import java.util.regex.Pattern; import java.util.zip.GZIPOutputStream; +import com.yahoo.compress.ZstdOutputStream; +import com.yahoo.io.NativeIO; +import com.yahoo.log.LogFileDb; +import com.yahoo.protect.Process; +import com.yahoo.yolean.Exceptions; + +import java.io.BufferedOutputStream; +import java.io.FileDescriptor; +import java.io.FileNotFoundException; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; + /** * This class holds information about all (log) files contained @@ -28,6 +44,10 @@ public class FilesArchived { */ private final File root; + enum Compression {NONE, GZIP, ZSTD} + private final Compression compression; + private final NativeIO nativeIO = new NativeIO(); + private final Object mutex = new Object(); // known-existing files inside the archive directory @@ -60,8 +80,9 @@ public class FilesArchived { /** * Creates an instance of FilesArchive managing the given directory */ - public FilesArchived(File rootDir) { + public FilesArchived(File rootDir, String zip) { this.root = rootDir; + this.compression = ("zstd".equals(zip)) ? Compression.ZSTD : Compression.GZIP; rescan(); Thread thread = new Thread(this::run); thread.setDaemon(true); @@ -152,7 +173,24 @@ public class FilesArchived { return count > 0; } + private void compress(File oldFile) { + switch (compression) { + case ZSTD: + runCompressionZstd(nativeIO, oldFile); + break; + case GZIP: + compressGzip(oldFile); + break; + case NONE: + runCompressionNone(nativeIO, oldFile); + break; + default: + throw new IllegalArgumentException("Unknown compression " + compression); + } + } + + private void compressGzip(File oldFile) { File gzippedFile = new File(oldFile.getPath() + ".gz"); try (GZIPOutputStream compressor = new GZIPOutputStream(new FileOutputStream(gzippedFile), 0x100000); FileInputStream inputStream = new FileInputStream(oldFile)) @@ -173,6 +211,32 @@ public class FilesArchived { } } + private static void runCompressionZstd(NativeIO nativeIO, File oldFile) { + try { + Path compressedFile = Paths.get(oldFile.toString() + ".zst"); + int bufferSize = 2*1024*1024; + long mtime = oldFile.lastModified(); + try (FileOutputStream fileOut = AtomicFileOutputStream.create(compressedFile); + ZstdOutputStream out = new ZstdOutputStream(fileOut, bufferSize); + FileInputStream in = new FileInputStream(oldFile)) + { + pageFriendlyTransfer(nativeIO, out, fileOut.getFD(), in, bufferSize); + out.flush(); + } + compressedFile.toFile().setLastModified(mtime); + oldFile.delete(); + nativeIO.dropFileFromCache(compressedFile.toFile()); + } catch (IOException e) { + log.log(Level.WARNING, "Failed to compress log file with zstd: " + oldFile, e); + } finally { + nativeIO.dropFileFromCache(oldFile); + } + } + + private static void runCompressionNone(NativeIO nativeIO, File oldFile) { + nativeIO.dropFileFromCache(oldFile); + } + long sumFileSizes() { long sum = 0; for (LogFile lf : knownFiles) { @@ -210,7 +274,7 @@ public class FilesArchived { } static class LogFile { - public final File path; + public final File path; public final String prefix; public final int generation; public final boolean zsuff; @@ -245,6 +309,7 @@ public class FilesArchived { } private static boolean zSuffix(String name) { if (name.endsWith(".gz")) return true; + if (name.endsWith(".zst")) return true; // add other compression suffixes here return false; } @@ -259,4 +324,44 @@ public class FilesArchived { return "FilesArchived.LogFile{name="+path+" prefix="+prefix+" gen="+generation+" z="+zsuff+"}"; } } + + private static class AtomicFileOutputStream extends FileOutputStream { + private final Path path; + private final Path tmpPath; + private volatile boolean closed = false; + + private AtomicFileOutputStream(Path path, Path tmpPath) throws FileNotFoundException { + super(tmpPath.toFile()); + this.path = path; + this.tmpPath = tmpPath; + } + + @Override + public synchronized void close() throws IOException { + super.close(); + if (!closed) { + Files.move(tmpPath, path, StandardCopyOption.ATOMIC_MOVE); + closed = true; + } + } + + private static AtomicFileOutputStream create(Path path) throws FileNotFoundException { + return new AtomicFileOutputStream(path, path.resolveSibling("." + path.getFileName() + ".tmp")); + } + } + + private static void pageFriendlyTransfer(NativeIO nativeIO, OutputStream out, FileDescriptor outDescriptor, FileInputStream in, int bufferSize) throws IOException { + int read; + long totalBytesRead = 0; + byte[] buffer = new byte[bufferSize]; + while ((read = in.read(buffer)) >= 0) { + out.write(buffer, 0, read); + if (read > 0) { + nativeIO.dropPartialFileFromCache(in.getFD(), totalBytesRead, read, false); + nativeIO.dropPartialFileFromCache(outDescriptor, totalBytesRead, read, false); + } + totalBytesRead += read; + } + } + } |