diff options
author | Morten Tokle <mortent@verizonmedia.com> | 2021-01-25 14:32:04 +0100 |
---|---|---|
committer | Morten Tokle <mortent@verizonmedia.com> | 2021-01-25 14:32:04 +0100 |
commit | bab45d46a29cf43cc53b8c72102f12062ad88a82 (patch) | |
tree | b85861b067d51f84df0a3e369f66e52ffcb2ddc3 | |
parent | a168e65daed4aaf96c55e5956ed2d641776c0a7e (diff) |
Queue close,flush and rotate operations
-rw-r--r-- | jdisc_http_service/src/main/java/com/yahoo/container/logging/LogFileHandler.java | 97 | ||||
-rw-r--r-- | jdisc_http_service/src/test/java/com/yahoo/container/logging/LogFileHandlerTestCase.java | 4 |
2 files changed, 67 insertions, 34 deletions
diff --git a/jdisc_http_service/src/main/java/com/yahoo/container/logging/LogFileHandler.java b/jdisc_http_service/src/main/java/com/yahoo/container/logging/LogFileHandler.java index bfb51d21c6c..7bd719c161f 100644 --- a/jdisc_http_service/src/main/java/com/yahoo/container/logging/LogFileHandler.java +++ b/jdisc_http_service/src/main/java/com/yahoo/container/logging/LogFileHandler.java @@ -18,11 +18,12 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; +import java.util.Optional; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; import java.util.logging.Logger; import java.util.zip.GZIPOutputStream; @@ -43,8 +44,7 @@ class LogFileHandler <LOGTYPE> { private final long[] rotationTimes; private final String filePattern; // default to current directory, ms time stamp private final String symlinkName; - private final ArrayBlockingQueue<LOGTYPE> logQueue = new ArrayBlockingQueue<>(100000); - private final AtomicBoolean rotate = new AtomicBoolean(false); + private final ArrayBlockingQueue<Operation<LOGTYPE>> logQueue = new ArrayBlockingQueue<>(100000); private final ExecutorService executor = Executors.newCachedThreadPool(ThreadFactoryFactory.getDaemonThreadFactory("logfilehandler.compression")); private final NativeIO nativeIO = new NativeIO(); private final LogThread<LOGTYPE> logThread; @@ -73,20 +73,25 @@ class LogFileHandler <LOGTYPE> { com.yahoo.protect.Process.logAndDie("Failed storing log records", e); } - logFileHandler.flush(); + logFileHandler.internalFlush(); } private void storeLogRecords() throws InterruptedException { while (!isInterrupted()) { - LOGTYPE r = logFileHandler.logQueue.poll(100, TimeUnit.MILLISECONDS); - if(logFileHandler.rotate.get()) { - logFileHandler.internalRotateNow(); - lastFlush = System.nanoTime(); - logFileHandler.rotate.set(false); - } - if (r != null) { - logFileHandler.internalPublish(r); - flushIfOld(3, TimeUnit.SECONDS); + Operation<LOGTYPE> r = logFileHandler.logQueue.poll(100, TimeUnit.MILLISECONDS); + if (r!=null) { + if (r.type == Operation.Type.flush) { + logFileHandler.internalFlush(); + } else if (r.type == Operation.Type.close) { + logFileHandler.internalClose(); + } else if (r.type == Operation.Type.rotate) { + logFileHandler.internalRotateNow(); + lastFlush = System.nanoTime(); + } else if (r.type == Operation.Type.log) { + logFileHandler.internalPublish(r.log.get()); + flushIfOld(3, TimeUnit.SECONDS); + } + r.countDownLatch.countDown(); } else { flushIfOld(100, TimeUnit.MILLISECONDS); } @@ -96,7 +101,7 @@ class LogFileHandler <LOGTYPE> { private void flushIfOld(long age, TimeUnit unit) { long now = System.nanoTime(); if (TimeUnit.NANOSECONDS.toMillis(now - lastFlush) > unit.toMillis(age)) { - logFileHandler.flush(); + logFileHandler.internalFlush(); lastFlush = now; } } @@ -126,13 +131,14 @@ class LogFileHandler <LOGTYPE> { * @param r logrecord to publish */ public void publish(LOGTYPE r) { - try { - logQueue.put(r); - } catch (InterruptedException e) { - } + addOperation(new Operation<>(r)); + } + + public void flush() { + addOperationAndWait(new Operation<>(Operation.Type.flush)); } - public synchronized void flush() { + private synchronized void internalFlush() { try { FileOutputStream currentOut = this.currentOutputStream; if (currentOut != null) { @@ -152,8 +158,12 @@ class LogFileHandler <LOGTYPE> { } public void close() { + addOperationAndWait(new Operation<>(Operation.Type.close)); + } + + public void internalClose() { try { - flush(); + internalFlush(); FileOutputStream currentOut = this.currentOutputStream; if (currentOut != null) currentOut.close(); } catch (Exception e) { @@ -181,6 +191,20 @@ class LogFileHandler <LOGTYPE> { } } + private void addOperation(Operation<LOGTYPE> op) { + try { + logQueue.put(op); + } catch (InterruptedException e) { + } + } + private void addOperationAndWait(Operation<LOGTYPE> op) { + try { + logQueue.put(op); + op.countDownLatch.await(); + } catch (InterruptedException e) { + } + } + /** * Find next rotation after specified time. * @@ -206,16 +230,6 @@ class LogFileHandler <LOGTYPE> { return next; } - void waitDrained() { - while(! logQueue.isEmpty()) { - try { - Thread.sleep(1); - } catch (InterruptedException e) { - } - } - flush(); - } - private void checkAndCreateDir(String pathname) { int lastSlash = pathname.lastIndexOf("/"); if (lastSlash > -1) { @@ -231,7 +245,7 @@ class LogFileHandler <LOGTYPE> { * Force file rotation now, independent of schedule. */ void rotateNow () { - rotate.set(true); + addOperationAndWait(new Operation<>(Operation.Type.rotate)); } // Throw InterruptedException upwards rather than relying on isInterrupted to stop the thread as @@ -243,7 +257,7 @@ class LogFileHandler <LOGTYPE> { String oldFileName = fileName; long now = System.currentTimeMillis(); fileName = LogFormatter.insertDate(filePattern, now); - flush(); + internalFlush(); try { checkAndCreateDir(fileName); @@ -426,4 +440,23 @@ class LogFileHandler <LOGTYPE> { return fileName; } + private static class Operation<LOGTYPE> { + enum Type {log, flush, close, rotate}; + + final Type type; + + final Optional<LOGTYPE> log; + final CountDownLatch countDownLatch = new CountDownLatch(1); + + Operation(Type type) { + this(type, Optional.empty()); + } + Operation(LOGTYPE log) { + this(Type.log, Optional.of(log)); + } + private Operation(Type type, Optional<LOGTYPE> log) { + this.type = type; + this.log = log; + } + } } diff --git a/jdisc_http_service/src/test/java/com/yahoo/container/logging/LogFileHandlerTestCase.java b/jdisc_http_service/src/test/java/com/yahoo/container/logging/LogFileHandlerTestCase.java index f76312af61e..7b46e1540f6 100644 --- a/jdisc_http_service/src/test/java/com/yahoo/container/logging/LogFileHandlerTestCase.java +++ b/jdisc_http_service/src/test/java/com/yahoo/container/logging/LogFileHandlerTestCase.java @@ -127,7 +127,7 @@ public class LogFileHandlerTestCase { String longMessage = formatter.format(new LogRecord(Level.INFO, "string which is way longer than the word test")); handler.publish(longMessage); - handler.waitDrained(); + handler.flush(); assertThat(Files.size(Paths.get(firstFile))).isEqualTo(31); final long expectedSecondFileLength = 72; long secondFileLength; @@ -172,7 +172,7 @@ public class LogFileHandlerTestCase { for (int i = 0; i < logEntries; i++) { h.publish("test"); } - h.waitDrained(); + h.flush(); String f1 = h.getFileName(); assertThat(f1).startsWith(root.getAbsolutePath() + "/logfilehandlertest."); File uncompressed = new File(f1); |