From a72712457228d10ac31e9f7558dfa26d1e2404cd Mon Sep 17 00:00:00 2001 From: Morten Tokle Date: Mon, 25 Jan 2021 16:07:14 +0100 Subject: Move queue to LogFileHandler --- .../yahoo/container/logging/LogFileHandler.java | 49 +++++++++++++++------- 1 file changed, 33 insertions(+), 16 deletions(-) diff --git a/jdisc_http_service/src/main/java/com/yahoo/container/logging/LogFileHandler.java b/jdisc_http_service/src/main/java/com/yahoo/container/logging/LogFileHandler.java index f85d58524a8..72b54e3dce9 100644 --- a/jdisc_http_service/src/main/java/com/yahoo/container/logging/LogFileHandler.java +++ b/jdisc_http_service/src/main/java/com/yahoo/container/logging/LogFileHandler.java @@ -24,6 +24,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import java.util.logging.Level; import java.util.logging.Logger; import java.util.zip.GZIPOutputStream; @@ -39,7 +40,7 @@ class LogFileHandler { enum Compression {NONE, GZIP, ZSTD} private final static Logger logger = Logger.getLogger(LogFileHandler.class.getName()); - + private final ArrayBlockingQueue> logQueue = new ArrayBlockingQueue<>(100000); final LogThread logThread; LogFileHandler(Compression compression, String filePattern, String rotationTimes, String symlinkName, LogWriter logWriter) { @@ -52,10 +53,18 @@ class LogFileHandler { long[] rotationTimes, String symlinkName, LogWriter logWriter) { - this.logThread = new LogThread<>(logWriter, filePattern, compression, rotationTimes, symlinkName); + this.logThread = new LogThread<>(logWriter, filePattern, compression, rotationTimes, symlinkName, this::poll); this.logThread.start(); } + private Operation poll() { + try { + return logQueue.poll(100, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + // TODO: Handle interrupt + return null; + } + } /** * Sends logrecord to file, first rotating file if needed. @@ -83,14 +92,14 @@ class LogFileHandler { private void addOperation(Operation op) { try { - logThread.logQueue.put(op); + logQueue.put(op); } catch (InterruptedException e) { } } private void addOperationAndWait(Operation op) { try { - logThread.logQueue.put(op); + logQueue.put(op); op.countDownLatch.await(); } catch (InterruptedException e) { } @@ -103,9 +112,13 @@ class LogFileHandler { void shutdown() { logThread.interrupt(); try { - logThread.join(); - logThread.executor.shutdown(); + System.out.println(1); + logThread.executor.shutdownNow(); + System.out.println(2); logThread.executor.awaitTermination(600, TimeUnit.SECONDS); + System.out.println(3); + logThread.join(); + System.out.println(4); } catch (InterruptedException e) { } } @@ -173,14 +186,14 @@ class LogFileHandler { * Handle logging and file operations */ static class LogThread extends Thread { + private final Supplier> operationProvider; long lastFlush = 0; private FileOutputStream currentOutputStream = null; private long nextRotationTime = 0; private final String filePattern; // default to current directory, ms time stamp - private String fileName; + private volatile String fileName; private long lastDropPosition = 0; private final LogWriter logWriter; - private final ArrayBlockingQueue> logQueue = new ArrayBlockingQueue<>(100000); private final Compression compression; private final long[] rotationTimes; private final String symlinkName; @@ -188,7 +201,12 @@ class LogFileHandler { private final NativeIO nativeIO = new NativeIO(); - LogThread(LogWriter logWriter, String filePattern, Compression compression, long[] rotationTimes, String symlinkName) { + LogThread(LogWriter logWriter, + String filePattern, + Compression compression, + long[] rotationTimes, + String symlinkName, + Supplier> operationProvider) { super("Logger"); setDaemon(true); this.logWriter = logWriter; @@ -196,6 +214,7 @@ class LogFileHandler { this.compression = compression; this.rotationTimes = rotationTimes; this.symlinkName = (symlinkName != null && !symlinkName.isBlank()) ? symlinkName : null; + this.operationProvider = operationProvider; } @Override @@ -212,7 +231,7 @@ class LogFileHandler { private void storeLogRecords() throws InterruptedException { while (!isInterrupted()) { - Operation r = logQueue.poll(100, TimeUnit.MILLISECONDS); + Operation r = operationProvider.get(); if (r != null) { if (r.type == Operation.Type.flush) { internalFlush(); @@ -398,12 +417,12 @@ class LogFileHandler { private static void runCompressionGzip(File oldFile) { File gzippedFile = new File(oldFile.getPath() + ".gz"); + NativeIO nativeIO = new NativeIO(); try (GZIPOutputStream compressor = new GZIPOutputStream(new FileOutputStream(gzippedFile), 0x100000); FileInputStream inputStream = new FileInputStream(oldFile)) { byte[] buffer = new byte[0x400000]; // 4M buffer long totalBytesRead = 0; - NativeIO nativeIO = new NativeIO(); for (int read = inputStream.read(buffer); read > 0; read = inputStream.read(buffer)) { compressor.write(buffer, 0, read); nativeIO.dropPartialFileFromCache(inputStream.getFD(), totalBytesRead, read, false); @@ -412,11 +431,11 @@ class LogFileHandler { compressor.finish(); compressor.flush(); - oldFile.delete(); - nativeIO.dropFileFromCache(gzippedFile); } catch (IOException e) { logger.warning("Got '" + e + "' while compressing '" + oldFile.getPath() + "'."); } + oldFile.delete(); + nativeIO.dropFileFromCache(gzippedFile); } /** @@ -439,9 +458,7 @@ class LogFileHandler { } } - // Support staff :-) - private static final long lengthOfDayMillis = 24 * 60 * 60 * 1000; // ? is this close enough ? - + private static final long lengthOfDayMillis = 24 * 60 * 60 * 1000; private static long timeOfDayMillis(long time) { return time % lengthOfDayMillis; } -- cgit v1.2.3