summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMorten Tokle <mortent@verizonmedia.com>2021-01-25 14:32:04 +0100
committerMorten Tokle <mortent@verizonmedia.com>2021-01-25 14:32:04 +0100
commitbab45d46a29cf43cc53b8c72102f12062ad88a82 (patch)
treeb85861b067d51f84df0a3e369f66e52ffcb2ddc3
parenta168e65daed4aaf96c55e5956ed2d641776c0a7e (diff)
Queue close,flush and rotate operations
-rw-r--r--jdisc_http_service/src/main/java/com/yahoo/container/logging/LogFileHandler.java97
-rw-r--r--jdisc_http_service/src/test/java/com/yahoo/container/logging/LogFileHandlerTestCase.java4
2 files changed, 67 insertions, 34 deletions
diff --git a/jdisc_http_service/src/main/java/com/yahoo/container/logging/LogFileHandler.java b/jdisc_http_service/src/main/java/com/yahoo/container/logging/LogFileHandler.java
index bfb51d21c6c..7bd719c161f 100644
--- a/jdisc_http_service/src/main/java/com/yahoo/container/logging/LogFileHandler.java
+++ b/jdisc_http_service/src/main/java/com/yahoo/container/logging/LogFileHandler.java
@@ -18,11 +18,12 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.zip.GZIPOutputStream;
@@ -43,8 +44,7 @@ class LogFileHandler <LOGTYPE> {
private final long[] rotationTimes;
private final String filePattern; // default to current directory, ms time stamp
private final String symlinkName;
- private final ArrayBlockingQueue<LOGTYPE> logQueue = new ArrayBlockingQueue<>(100000);
- private final AtomicBoolean rotate = new AtomicBoolean(false);
+ private final ArrayBlockingQueue<Operation<LOGTYPE>> logQueue = new ArrayBlockingQueue<>(100000);
private final ExecutorService executor = Executors.newCachedThreadPool(ThreadFactoryFactory.getDaemonThreadFactory("logfilehandler.compression"));
private final NativeIO nativeIO = new NativeIO();
private final LogThread<LOGTYPE> logThread;
@@ -73,20 +73,25 @@ class LogFileHandler <LOGTYPE> {
com.yahoo.protect.Process.logAndDie("Failed storing log records", e);
}
- logFileHandler.flush();
+ logFileHandler.internalFlush();
}
private void storeLogRecords() throws InterruptedException {
while (!isInterrupted()) {
- LOGTYPE r = logFileHandler.logQueue.poll(100, TimeUnit.MILLISECONDS);
- if(logFileHandler.rotate.get()) {
- logFileHandler.internalRotateNow();
- lastFlush = System.nanoTime();
- logFileHandler.rotate.set(false);
- }
- if (r != null) {
- logFileHandler.internalPublish(r);
- flushIfOld(3, TimeUnit.SECONDS);
+ Operation<LOGTYPE> r = logFileHandler.logQueue.poll(100, TimeUnit.MILLISECONDS);
+ if (r!=null) {
+ if (r.type == Operation.Type.flush) {
+ logFileHandler.internalFlush();
+ } else if (r.type == Operation.Type.close) {
+ logFileHandler.internalClose();
+ } else if (r.type == Operation.Type.rotate) {
+ logFileHandler.internalRotateNow();
+ lastFlush = System.nanoTime();
+ } else if (r.type == Operation.Type.log) {
+ logFileHandler.internalPublish(r.log.get());
+ flushIfOld(3, TimeUnit.SECONDS);
+ }
+ r.countDownLatch.countDown();
} else {
flushIfOld(100, TimeUnit.MILLISECONDS);
}
@@ -96,7 +101,7 @@ class LogFileHandler <LOGTYPE> {
private void flushIfOld(long age, TimeUnit unit) {
long now = System.nanoTime();
if (TimeUnit.NANOSECONDS.toMillis(now - lastFlush) > unit.toMillis(age)) {
- logFileHandler.flush();
+ logFileHandler.internalFlush();
lastFlush = now;
}
}
@@ -126,13 +131,14 @@ class LogFileHandler <LOGTYPE> {
* @param r logrecord to publish
*/
public void publish(LOGTYPE r) {
- try {
- logQueue.put(r);
- } catch (InterruptedException e) {
- }
+ addOperation(new Operation<>(r));
+ }
+
+ public void flush() {
+ addOperationAndWait(new Operation<>(Operation.Type.flush));
}
- public synchronized void flush() {
+ private synchronized void internalFlush() {
try {
FileOutputStream currentOut = this.currentOutputStream;
if (currentOut != null) {
@@ -152,8 +158,12 @@ class LogFileHandler <LOGTYPE> {
}
public void close() {
+ addOperationAndWait(new Operation<>(Operation.Type.close));
+ }
+
+ public void internalClose() {
try {
- flush();
+ internalFlush();
FileOutputStream currentOut = this.currentOutputStream;
if (currentOut != null) currentOut.close();
} catch (Exception e) {
@@ -181,6 +191,20 @@ class LogFileHandler <LOGTYPE> {
}
}
+ private void addOperation(Operation<LOGTYPE> op) {
+ try {
+ logQueue.put(op);
+ } catch (InterruptedException e) {
+ }
+ }
+ private void addOperationAndWait(Operation<LOGTYPE> op) {
+ try {
+ logQueue.put(op);
+ op.countDownLatch.await();
+ } catch (InterruptedException e) {
+ }
+ }
+
/**
* Find next rotation after specified time.
*
@@ -206,16 +230,6 @@ class LogFileHandler <LOGTYPE> {
return next;
}
- void waitDrained() {
- while(! logQueue.isEmpty()) {
- try {
- Thread.sleep(1);
- } catch (InterruptedException e) {
- }
- }
- flush();
- }
-
private void checkAndCreateDir(String pathname) {
int lastSlash = pathname.lastIndexOf("/");
if (lastSlash > -1) {
@@ -231,7 +245,7 @@ class LogFileHandler <LOGTYPE> {
* Force file rotation now, independent of schedule.
*/
void rotateNow () {
- rotate.set(true);
+ addOperationAndWait(new Operation<>(Operation.Type.rotate));
}
// Throw InterruptedException upwards rather than relying on isInterrupted to stop the thread as
@@ -243,7 +257,7 @@ class LogFileHandler <LOGTYPE> {
String oldFileName = fileName;
long now = System.currentTimeMillis();
fileName = LogFormatter.insertDate(filePattern, now);
- flush();
+ internalFlush();
try {
checkAndCreateDir(fileName);
@@ -426,4 +440,23 @@ class LogFileHandler <LOGTYPE> {
return fileName;
}
+ private static class Operation<LOGTYPE> {
+ enum Type {log, flush, close, rotate};
+
+ final Type type;
+
+ final Optional<LOGTYPE> log;
+ final CountDownLatch countDownLatch = new CountDownLatch(1);
+
+ Operation(Type type) {
+ this(type, Optional.empty());
+ }
+ Operation(LOGTYPE log) {
+ this(Type.log, Optional.of(log));
+ }
+ private Operation(Type type, Optional<LOGTYPE> log) {
+ this.type = type;
+ this.log = log;
+ }
+ }
}
diff --git a/jdisc_http_service/src/test/java/com/yahoo/container/logging/LogFileHandlerTestCase.java b/jdisc_http_service/src/test/java/com/yahoo/container/logging/LogFileHandlerTestCase.java
index f76312af61e..7b46e1540f6 100644
--- a/jdisc_http_service/src/test/java/com/yahoo/container/logging/LogFileHandlerTestCase.java
+++ b/jdisc_http_service/src/test/java/com/yahoo/container/logging/LogFileHandlerTestCase.java
@@ -127,7 +127,7 @@ public class LogFileHandlerTestCase {
String longMessage = formatter.format(new LogRecord(Level.INFO, "string which is way longer than the word test"));
handler.publish(longMessage);
- handler.waitDrained();
+ handler.flush();
assertThat(Files.size(Paths.get(firstFile))).isEqualTo(31);
final long expectedSecondFileLength = 72;
long secondFileLength;
@@ -172,7 +172,7 @@ public class LogFileHandlerTestCase {
for (int i = 0; i < logEntries; i++) {
h.publish("test");
}
- h.waitDrained();
+ h.flush();
String f1 = h.getFileName();
assertThat(f1).startsWith(root.getAbsolutePath() + "/logfilehandlertest.");
File uncompressed = new File(f1);