From 17349ba3ab25ff89ba449244242f9cacb4846bb1 Mon Sep 17 00:00:00 2001 From: gjoranv Date: Thu, 18 Mar 2021 19:51:09 +0100 Subject: Add main java source from jdisc_http_service. --- .../yahoo/container/logging/LogFileHandler.java | 563 +++++++++++++++++++++ 1 file changed, 563 insertions(+) create mode 100644 container-core/src/main/java/com/yahoo/container/logging/LogFileHandler.java (limited to 'container-core/src/main/java/com/yahoo/container/logging/LogFileHandler.java') diff --git a/container-core/src/main/java/com/yahoo/container/logging/LogFileHandler.java b/container-core/src/main/java/com/yahoo/container/logging/LogFileHandler.java new file mode 100644 index 00000000000..0f2a9e42eb8 --- /dev/null +++ b/container-core/src/main/java/com/yahoo/container/logging/LogFileHandler.java @@ -0,0 +1,563 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.container.logging; + +import com.yahoo.compress.ZstdOuputStream; +import com.yahoo.io.NativeIO; +import com.yahoo.log.LogFileDb; +import com.yahoo.protect.Process; +import com.yahoo.yolean.Exceptions; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileDescriptor; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Optional; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; +import java.util.zip.GZIPOutputStream; + +/** + * Implements log file naming/rotating logic for container logs. + * + * @author Bob Travis + * @author bjorncs + */ +class LogFileHandler { + + enum Compression {NONE, GZIP, ZSTD} + + private final static Logger logger = Logger.getLogger(LogFileHandler.class.getName()); + private final BlockingQueue> logQueue; + final LogThread logThread; + + @FunctionalInterface private interface Pollable { Operation poll() throws InterruptedException; } + + LogFileHandler(Compression compression, String filePattern, String rotationTimes, String symlinkName, int queueSize, + String threadName, LogWriter logWriter) { + this(compression, filePattern, calcTimesMinutes(rotationTimes), symlinkName, queueSize, threadName, logWriter); + } + + LogFileHandler( + Compression compression, + String filePattern, + long[] rotationTimes, + String symlinkName, + int queueSize, + String threadName, + LogWriter logWriter) { + this.logQueue = new LinkedBlockingQueue<>(queueSize); + this.logThread = new LogThread<>(logWriter, filePattern, compression, rotationTimes, symlinkName, threadName, this::poll); + this.logThread.start(); + } + + private Operation poll() throws InterruptedException { + return logQueue.poll(100, TimeUnit.MILLISECONDS); + } + + /** + * Sends logrecord to file, first rotating file if needed. + * + * @param r logrecord to publish + */ + public void publish(LOGTYPE r) { + addOperation(new Operation<>(r)); + } + + void publishAndWait(LOGTYPE r) { + addOperationAndWait(new Operation<>(r)); + } + + public void flush() { + addOperationAndWait(new Operation<>(Operation.Type.flush)); + } + + /** + * Force file rotation now, independent of schedule. + */ + void rotateNow() { + addOperationAndWait(new Operation<>(Operation.Type.rotate)); + } + + public void close() { + addOperationAndWait(new Operation<>(Operation.Type.close)); + } + + private void addOperation(Operation op) { + try { + logQueue.put(op); + } catch (InterruptedException e) { + } + } + + private void addOperationAndWait(Operation op) { + try { + logQueue.put(op); + op.countDownLatch.await(); + } catch (InterruptedException e) { + } + } + + /** + * Flushes all queued messages, interrupts the log thread in this and + * waits for it to end before returning + */ + void shutdown() { + logThread.interrupt(); + try { + logThread.executor.shutdownNow(); + logThread.executor.awaitTermination(600, TimeUnit.SECONDS); + logThread.join(); + } catch (InterruptedException e) { + } + } + + /** + * Calculate rotation times array, given times in minutes, as "0 60 ..." + */ + private static long[] calcTimesMinutes(String times) { + ArrayList list = new ArrayList<>(50); + int i = 0; + boolean etc = false; + + while (i < times.length()) { + if (times.charAt(i) == ' ') { + i++; + continue; + } // skip spaces + int j = i; // start of string + i = times.indexOf(' ', i); + if (i == -1) i = times.length(); + if (times.charAt(j) == '.' && times.substring(j, i).equals("...")) { // ... + etc = true; + break; + } + list.add(Long.valueOf(times.substring(j, i))); + } + + int size = list.size(); + long[] longtimes = new long[size]; + for (i = 0; i < size; i++) { + longtimes[i] = list.get(i) // pick up value in minutes past midnight + * 60000; // and multiply to get millis + } + + if (etc) { // fill out rest of day, same as final interval + long endOfDay = 24 * 60 * 60 * 1000; + long lasttime = longtimes[size - 1]; + long interval = lasttime - longtimes[size - 2]; + long moreneeded = (endOfDay - lasttime) / interval; + if (moreneeded > 0) { + int newsize = size + (int) moreneeded; + long[] temp = new long[newsize]; + for (i = 0; i < size; i++) { + temp[i] = longtimes[i]; + } + while (size < newsize) { + lasttime += interval; + temp[size++] = lasttime; + } + longtimes = temp; + } + } + + return longtimes; + } + + /** + * Only for unit testing. Do not use. + */ + String getFileName() { + return logThread.fileName; + } + + /** + * Handle logging and file operations + */ + static class LogThread extends Thread { + private final Pollable operationProvider; + long lastFlush = 0; + private PageCacheFriendlyFileOutputStream fileOutput = null; + private long nextRotationTime = 0; + private final String filePattern; // default to current directory, ms time stamp + private volatile String fileName; + private final LogWriter logWriter; + private final Compression compression; + private final long[] rotationTimes; + private final String symlinkName; + private final ExecutorService executor = createCompressionTaskExecutor(); + private final NativeIO nativeIO = new NativeIO(); + + + LogThread(LogWriter logWriter, + String filePattern, + Compression compression, + long[] rotationTimes, + String symlinkName, + String threadName, + Pollable operationProvider) { + super(threadName); + setDaemon(true); + this.logWriter = logWriter; + this.filePattern = filePattern; + this.compression = compression; + this.rotationTimes = rotationTimes; + this.symlinkName = (symlinkName != null && !symlinkName.isBlank()) ? symlinkName : null; + this.operationProvider = operationProvider; + } + + private static ExecutorService createCompressionTaskExecutor() { + return Executors.newSingleThreadExecutor(runnable -> { + Thread thread = new Thread(runnable, "logfilehandler.compression"); + thread.setDaemon(true); + thread.setPriority(Thread.MIN_PRIORITY); + return thread; + }); + } + + @Override + public void run() { + try { + handleLogOperations(); + } catch (InterruptedException e) { + } catch (Exception e) { + Process.logAndDie("Failed storing log records", e); + } + + internalFlush(); + } + + private void handleLogOperations() throws InterruptedException { + while (!isInterrupted()) { + Operation r = operationProvider.poll(); + if (r != null) { + if (r.type == Operation.Type.flush) { + internalFlush(); + } else if (r.type == Operation.Type.close) { + internalClose(); + } else if (r.type == Operation.Type.rotate) { + internalRotateNow(); + lastFlush = System.nanoTime(); + } else if (r.type == Operation.Type.log) { + internalPublish(r.log.get()); + flushIfOld(3, TimeUnit.SECONDS); + } + r.countDownLatch.countDown(); + } else { + flushIfOld(100, TimeUnit.MILLISECONDS); + } + } + } + + private void flushIfOld(long age, TimeUnit unit) { + long now = System.nanoTime(); + if (TimeUnit.NANOSECONDS.toMillis(now - lastFlush) > unit.toMillis(age)) { + internalFlush(); + lastFlush = now; + } + } + + private void internalFlush() { + try { + if (fileOutput != null) { + fileOutput.flush(); + } + } catch (IOException e) { + logger.log(Level.WARNING, "Failed to flush file output: " + Exceptions.toMessageString(e), e); + } + } + + private void internalClose() { + try { + if (fileOutput != null) { + fileOutput.flush(); + fileOutput.close(); + fileOutput = null; + } + } catch (Exception e) { + logger.log(Level.WARNING, "Got error while closing log file: " + e.getMessage(), e); + } + } + + private void internalPublish(LOGTYPE r) { + // first check to see if new file needed. + // if so, use this.internalRotateNow() to do it + + long now = System.currentTimeMillis(); + if (nextRotationTime <= 0) { + nextRotationTime = getNextRotationTime(now); // lazy initialization + } + if (now > nextRotationTime || fileOutput == null) { + internalRotateNow(); + } + try { + logWriter.write(r, fileOutput); + fileOutput.write('\n'); + } catch (IOException e) { + logger.warning("Failed writing log record: " + Exceptions.toMessageString(e)); + } + } + + /** + * Find next rotation after specified time. + * + * @param now the specified time; if zero, current time is used. + * @return the next rotation time + */ + long getNextRotationTime(long now) { + if (now <= 0) { + now = System.currentTimeMillis(); + } + long nowTod = timeOfDayMillis(now); + long next = 0; + for (long rotationTime : rotationTimes) { + if (nowTod < rotationTime) { + next = rotationTime - nowTod + now; + break; + } + } + if (next == 0) { // didn't find one -- use 1st time 'tomorrow' + next = rotationTimes[0] + lengthOfDayMillis - nowTod + now; + } + + return next; + } + + private void checkAndCreateDir(String pathname) { + int lastSlash = pathname.lastIndexOf("/"); + if (lastSlash > -1) { + String pathExcludingFilename = pathname.substring(0, lastSlash); + File filepath = new File(pathExcludingFilename); + if (!filepath.exists()) { + filepath.mkdirs(); + } + } + } + + + // Throw InterruptedException upwards rather than relying on isInterrupted to stop the thread as + // isInterrupted() returns false after interruption in p.waitFor + private void internalRotateNow() { + // figure out new file name, then + + String oldFileName = fileName; + long now = System.currentTimeMillis(); + fileName = LogFormatter.insertDate(filePattern, now); + internalClose(); + try { + checkAndCreateDir(fileName); + fileOutput = new PageCacheFriendlyFileOutputStream(nativeIO, Paths.get(fileName), 4 * 1024 * 1024); + LogFileDb.nowLoggingTo(fileName); + } catch (IOException e) { + throw new RuntimeException("Couldn't open log file '" + fileName + "'", e); + } + + if(oldFileName == null) oldFileName = getOldFileNameFromSymlink(); // To compress previous file, if so configured + createSymlinkToCurrentFile(); + + nextRotationTime = 0; //figure it out later (lazy evaluation) + if ((oldFileName != null)) { + Path oldFile = Paths.get(oldFileName); + if (Files.exists(oldFile)) { + executor.execute(() -> runCompression(nativeIO, oldFile, compression)); + } + } + } + + + private static void runCompression(NativeIO nativeIO, Path oldFile, Compression compression) { + switch (compression) { + case ZSTD: + runCompressionZstd(nativeIO, oldFile); + break; + case GZIP: + runCompressionGzip(nativeIO, oldFile); + break; + case NONE: + runCompressionNone(nativeIO, oldFile); + break; + default: + throw new IllegalArgumentException("Unknown compression " + compression); + } + } + + private static void runCompressionNone(NativeIO nativeIO, Path oldFile) { + nativeIO.dropFileFromCache(oldFile.toFile()); + } + + private static void runCompressionZstd(NativeIO nativeIO, Path oldFile) { + try { + Path compressedFile = Paths.get(oldFile.toString() + ".zst"); + int bufferSize = 2*1024*1024; + try (FileOutputStream fileOut = AtomicFileOutputStream.create(compressedFile); + ZstdOuputStream out = new ZstdOuputStream(fileOut, bufferSize); + FileInputStream in = new FileInputStream(oldFile.toFile())) { + pageFriendlyTransfer(nativeIO, out, fileOut.getFD(), in, bufferSize); + out.flush(); + } + Files.delete(oldFile); + nativeIO.dropFileFromCache(compressedFile.toFile()); + } catch (IOException e) { + logger.log(Level.WARNING, "Failed to compress log file with zstd: " + oldFile, e); + } finally { + nativeIO.dropFileFromCache(oldFile.toFile()); + } + } + + private static void runCompressionGzip(NativeIO nativeIO, Path oldFile) { + try { + Path gzippedFile = Paths.get(oldFile.toString() + ".gz"); + try (FileOutputStream fileOut = AtomicFileOutputStream.create(gzippedFile); + GZIPOutputStream compressor = new GZIPOutputStream(fileOut, 0x100000); + FileInputStream inputStream = new FileInputStream(oldFile.toFile())) { + pageFriendlyTransfer(nativeIO, compressor, fileOut.getFD(), inputStream, 0x400000); + compressor.finish(); + compressor.flush(); + } + Files.delete(oldFile); + nativeIO.dropFileFromCache(gzippedFile.toFile()); + } catch (IOException e) { + logger.log(Level.WARNING, "Failed to compress log file with gzip: " + oldFile, e); + } finally { + nativeIO.dropFileFromCache(oldFile.toFile()); + } + } + + private static void pageFriendlyTransfer(NativeIO nativeIO, OutputStream out, FileDescriptor outDescriptor, FileInputStream in, int bufferSize) throws IOException { + int read; + long totalBytesRead = 0; + byte[] buffer = new byte[bufferSize]; + while ((read = in.read(buffer)) >= 0) { + out.write(buffer, 0, read); + if (read > 0) { + nativeIO.dropPartialFileFromCache(in.getFD(), totalBytesRead, read, false); + nativeIO.dropPartialFileFromCache(outDescriptor, totalBytesRead, read, false); + } + totalBytesRead += read; + } + } + + /** + * Name files by date - create a symlink with a constant name to the newest file + */ + private void createSymlinkToCurrentFile() { + if (symlinkName == null) return; + Path target = Paths.get(fileName); + Path link = target.resolveSibling(symlinkName); + try { + Files.deleteIfExists(link); + Files.createSymbolicLink(link, target.getFileName()); + } catch (IOException e) { + logger.log(Level.WARNING, "Failed to create symbolic link to current log file: " + e.getMessage(), e); + } + } + + private String getOldFileNameFromSymlink() { + if(symlinkName == null) return null; + try { + return Paths.get(fileName).resolveSibling(symlinkName).toRealPath().toString(); + } catch (IOException e) { + return null; + } + } + + private static final long lengthOfDayMillis = 24 * 60 * 60 * 1000; + private static long timeOfDayMillis(long time) { + return time % lengthOfDayMillis; + } + + } + + private static class Operation { + enum Type {log, flush, close, rotate} + + final Type type; + + final Optional log; + final CountDownLatch countDownLatch = new CountDownLatch(1); + + Operation(Type type) { + this(type, Optional.empty()); + } + + Operation(LOGTYPE log) { + this(Type.log, Optional.of(log)); + } + + private Operation(Type type, Optional log) { + this.type = type; + this.log = log; + } + } + + /** File output stream that signals to kernel to drop previous pages after write */ + private static class PageCacheFriendlyFileOutputStream extends OutputStream { + + private final NativeIO nativeIO; + private final FileOutputStream fileOut; + private final BufferedOutputStream bufferedOut; + private final int bufferSize; + private long lastDropPosition = 0; + + PageCacheFriendlyFileOutputStream(NativeIO nativeIO, Path file, int bufferSize) throws FileNotFoundException { + this.nativeIO = nativeIO; + this.fileOut = new FileOutputStream(file.toFile(), true); + this.bufferedOut = new BufferedOutputStream(fileOut, bufferSize); + this.bufferSize = bufferSize; + } + + @Override public void write(byte[] b) throws IOException { bufferedOut.write(b); } + @Override public void write(byte[] b, int off, int len) throws IOException { bufferedOut.write(b, off, len); } + @Override public void write(int b) throws IOException { bufferedOut.write(b); } + @Override public void close() throws IOException { bufferedOut.close(); } + + @Override + public void flush() throws IOException { + bufferedOut.flush(); + long newPos = fileOut.getChannel().position(); + if (newPos >= lastDropPosition + bufferSize) { + nativeIO.dropPartialFileFromCache(fileOut.getFD(), lastDropPosition, newPos, true); + lastDropPosition = newPos; + } + } + } + + private static class AtomicFileOutputStream extends FileOutputStream { + private final Path path; + private final Path tmpPath; + private volatile boolean closed = false; + + private AtomicFileOutputStream(Path path, Path tmpPath) throws FileNotFoundException { + super(tmpPath.toFile()); + this.path = path; + this.tmpPath = tmpPath; + } + + @Override + public synchronized void close() throws IOException { + super.close(); + if (!closed) { + Files.move(tmpPath, path, StandardCopyOption.ATOMIC_MOVE); + closed = true; + } + } + + private static AtomicFileOutputStream create(Path path) throws FileNotFoundException { + return new AtomicFileOutputStream(path, path.resolveSibling("." + path.getFileName() + ".tmp")); + } + } +} -- cgit v1.2.3