summaryrefslogtreecommitdiffstats
path: root/logserver/src/main/java/com/yahoo/logserver/handlers/archive/FilesArchived.java
diff options
context:
space:
mode:
Diffstat (limited to 'logserver/src/main/java/com/yahoo/logserver/handlers/archive/FilesArchived.java')
-rw-r--r--logserver/src/main/java/com/yahoo/logserver/handlers/archive/FilesArchived.java109
1 files changed, 107 insertions, 2 deletions
diff --git a/logserver/src/main/java/com/yahoo/logserver/handlers/archive/FilesArchived.java b/logserver/src/main/java/com/yahoo/logserver/handlers/archive/FilesArchived.java
index 54e47e15d8e..d1e9793ffaf 100644
--- a/logserver/src/main/java/com/yahoo/logserver/handlers/archive/FilesArchived.java
+++ b/logserver/src/main/java/com/yahoo/logserver/handlers/archive/FilesArchived.java
@@ -8,10 +8,26 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Pattern;
import java.util.zip.GZIPOutputStream;
+import com.yahoo.compress.ZstdOutputStream;
+import com.yahoo.io.NativeIO;
+import com.yahoo.log.LogFileDb;
+import com.yahoo.protect.Process;
+import com.yahoo.yolean.Exceptions;
+
+import java.io.BufferedOutputStream;
+import java.io.FileDescriptor;
+import java.io.FileNotFoundException;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+
/**
* This class holds information about all (log) files contained
@@ -28,6 +44,10 @@ public class FilesArchived {
*/
private final File root;
+ enum Compression {NONE, GZIP, ZSTD}
+ private final Compression compression;
+ private final NativeIO nativeIO = new NativeIO();
+
private final Object mutex = new Object();
// known-existing files inside the archive directory
@@ -60,8 +80,9 @@ public class FilesArchived {
/**
* Creates an instance of FilesArchive managing the given directory
*/
- public FilesArchived(File rootDir) {
+ public FilesArchived(File rootDir, String zip) {
this.root = rootDir;
+ this.compression = ("zstd".equals(zip)) ? Compression.ZSTD : Compression.GZIP;
rescan();
Thread thread = new Thread(this::run);
thread.setDaemon(true);
@@ -152,7 +173,24 @@ public class FilesArchived {
return count > 0;
}
+
private void compress(File oldFile) {
+ switch (compression) {
+ case ZSTD:
+ runCompressionZstd(nativeIO, oldFile);
+ break;
+ case GZIP:
+ compressGzip(oldFile);
+ break;
+ case NONE:
+ runCompressionNone(nativeIO, oldFile);
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown compression " + compression);
+ }
+ }
+
+ private void compressGzip(File oldFile) {
File gzippedFile = new File(oldFile.getPath() + ".gz");
try (GZIPOutputStream compressor = new GZIPOutputStream(new FileOutputStream(gzippedFile), 0x100000);
FileInputStream inputStream = new FileInputStream(oldFile))
@@ -173,6 +211,32 @@ public class FilesArchived {
}
}
+ private static void runCompressionZstd(NativeIO nativeIO, File oldFile) {
+ try {
+ Path compressedFile = Paths.get(oldFile.toString() + ".zst");
+ int bufferSize = 2*1024*1024;
+ long mtime = oldFile.lastModified();
+ try (FileOutputStream fileOut = AtomicFileOutputStream.create(compressedFile);
+ ZstdOutputStream out = new ZstdOutputStream(fileOut, bufferSize);
+ FileInputStream in = new FileInputStream(oldFile))
+ {
+ pageFriendlyTransfer(nativeIO, out, fileOut.getFD(), in, bufferSize);
+ out.flush();
+ }
+ compressedFile.toFile().setLastModified(mtime);
+ oldFile.delete();
+ nativeIO.dropFileFromCache(compressedFile.toFile());
+ } catch (IOException e) {
+ log.log(Level.WARNING, "Failed to compress log file with zstd: " + oldFile, e);
+ } finally {
+ nativeIO.dropFileFromCache(oldFile);
+ }
+ }
+
+ private static void runCompressionNone(NativeIO nativeIO, File oldFile) {
+ nativeIO.dropFileFromCache(oldFile);
+ }
+
long sumFileSizes() {
long sum = 0;
for (LogFile lf : knownFiles) {
@@ -210,7 +274,7 @@ public class FilesArchived {
}
static class LogFile {
- public final File path;
+ public final File path;
public final String prefix;
public final int generation;
public final boolean zsuff;
@@ -245,6 +309,7 @@ public class FilesArchived {
}
private static boolean zSuffix(String name) {
if (name.endsWith(".gz")) return true;
+ if (name.endsWith(".zst")) return true;
// add other compression suffixes here
return false;
}
@@ -259,4 +324,44 @@ public class FilesArchived {
return "FilesArchived.LogFile{name="+path+" prefix="+prefix+" gen="+generation+" z="+zsuff+"}";
}
}
+
+ private static class AtomicFileOutputStream extends FileOutputStream {
+ private final Path path;
+ private final Path tmpPath;
+ private volatile boolean closed = false;
+
+ private AtomicFileOutputStream(Path path, Path tmpPath) throws FileNotFoundException {
+ super(tmpPath.toFile());
+ this.path = path;
+ this.tmpPath = tmpPath;
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ super.close();
+ if (!closed) {
+ Files.move(tmpPath, path, StandardCopyOption.ATOMIC_MOVE);
+ closed = true;
+ }
+ }
+
+ private static AtomicFileOutputStream create(Path path) throws FileNotFoundException {
+ return new AtomicFileOutputStream(path, path.resolveSibling("." + path.getFileName() + ".tmp"));
+ }
+ }
+
+ private static void pageFriendlyTransfer(NativeIO nativeIO, OutputStream out, FileDescriptor outDescriptor, FileInputStream in, int bufferSize) throws IOException {
+ int read;
+ long totalBytesRead = 0;
+ byte[] buffer = new byte[bufferSize];
+ while ((read = in.read(buffer)) >= 0) {
+ out.write(buffer, 0, read);
+ if (read > 0) {
+ nativeIO.dropPartialFileFromCache(in.getFD(), totalBytesRead, read, false);
+ nativeIO.dropPartialFileFromCache(outDescriptor, totalBytesRead, read, false);
+ }
+ totalBytesRead += read;
+ }
+ }
+
}