diff options
Diffstat (limited to 'jdisc_http_service')
10 files changed, 520 insertions, 501 deletions
diff --git a/jdisc_http_service/pom.xml b/jdisc_http_service/pom.xml index 68a0f0636d3..2baba974b03 100644 --- a/jdisc_http_service/pom.xml +++ b/jdisc_http_service/pom.xml @@ -151,6 +151,11 @@ <version>${project.version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> <plugins> diff --git a/jdisc_http_service/src/main/java/com/yahoo/container/logging/ConnectionLogEntry.java b/jdisc_http_service/src/main/java/com/yahoo/container/logging/ConnectionLogEntry.java index 15a38f6eeae..a6b800a9279 100644 --- a/jdisc_http_service/src/main/java/com/yahoo/container/logging/ConnectionLogEntry.java +++ b/jdisc_http_service/src/main/java/com/yahoo/container/logging/ConnectionLogEntry.java @@ -2,13 +2,6 @@ package com.yahoo.container.logging; -import com.yahoo.slime.Cursor; -import com.yahoo.slime.Slime; -import com.yahoo.slime.SlimeUtils; -import com.yahoo.slime.Type; -import com.yahoo.yolean.Exceptions; - -import java.nio.charset.StandardCharsets; import java.time.Instant; import java.util.Optional; import java.util.UUID; @@ -69,75 +62,6 @@ public class ConnectionLogEntry { this.sslHandshakeFailureType = builder.sslHandshakeFailureType; } - public String toJson() { - Slime slime = new Slime(); - Cursor cursor = slime.setObject(); - cursor.setString("id", id.toString()); - setTimestamp(cursor, timestamp, "timestamp"); - - setDouble(cursor, durationSeconds, "duration"); - setString(cursor, peerAddress, "peerAddress"); - setInteger(cursor, peerPort, "peerPort"); - setString(cursor, localAddress, "localAddress"); - setInteger(cursor, localPort, "localPort"); - setString(cursor, remoteAddress, "remoteAddress"); - setInteger(cursor, remotePort, "remotePort"); - setLong(cursor, httpBytesReceived, "httpBytesReceived"); - setLong(cursor, httpBytesSent, "httpBytesSent"); - setLong(cursor, requests, "requests"); - setLong(cursor, responses, "responses"); - setString(cursor, sslProtocol, "ssl", "protocol"); - setString(cursor, sslSessionId, "ssl", "sessionId"); - setString(cursor, sslCipherSuite, "ssl", "cipherSuite"); - setString(cursor, sslPeerSubject, "ssl", "peerSubject"); - setTimestamp(cursor, sslPeerNotBefore, "ssl", "peerNotBefore"); - setTimestamp(cursor, sslPeerNotAfter, "ssl", "peerNotAfter"); - setString(cursor, sslSniServerName, "ssl", "sniServerName"); - setString(cursor, sslHandshakeFailureException, "ssl", "handshake-failure", "exception"); - setString(cursor, sslHandshakeFailureMessage, "ssl", "handshake-failure", "message"); - setString(cursor, sslHandshakeFailureType, "ssl", "handshake-failure", "type"); - return new String(Exceptions.uncheck(() -> SlimeUtils.toJsonBytes(slime)), StandardCharsets.UTF_8); - } - - private void setString(Cursor cursor, String value, String... keys) { - if(value != null) { - subCursor(cursor, keys).setString(keys[keys.length - 1], value); - } - } - - private void setLong(Cursor cursor, Long value, String... keys) { - if (value != null) { - subCursor(cursor, keys).setLong(keys[keys.length - 1], value); - } - } - - private void setInteger(Cursor cursor, Integer value, String... keys) { - if (value != null) { - subCursor(cursor, keys).setLong(keys[keys.length - 1], value); - } - } - - private void setTimestamp(Cursor cursor, Instant timestamp, String... keys) { - if (timestamp != null) { - subCursor(cursor, keys).setString(keys[keys.length - 1], timestamp.toString()); - } - } - - private void setDouble(Cursor cursor, Double value, String... keys) { - if (value != null) { - subCursor(cursor, keys).setDouble(keys[keys.length - 1], value); - } - } - - private static Cursor subCursor(Cursor cursor, String... keys) { - Cursor subCursor = cursor; - for (int i = 0; i < keys.length - 1; ++i) { - Cursor field = subCursor.field(keys[i]); - subCursor = field.type() != Type.NIX ? field : subCursor.setObject(keys[i]); - } - return subCursor; - } - public static Builder builder(UUID id, Instant timestamp) { return new Builder(id, timestamp); } diff --git a/jdisc_http_service/src/main/java/com/yahoo/container/logging/FileConnectionLog.java b/jdisc_http_service/src/main/java/com/yahoo/container/logging/FileConnectionLog.java index 62e53a5a514..968ba74b4f2 100644 --- a/jdisc_http_service/src/main/java/com/yahoo/container/logging/FileConnectionLog.java +++ b/jdisc_http_service/src/main/java/com/yahoo/container/logging/FileConnectionLog.java @@ -5,23 +5,19 @@ package com.yahoo.container.logging; import com.google.inject.Inject; import com.yahoo.component.AbstractComponent; -import java.io.IOException; -import java.io.OutputStream; -import java.nio.charset.StandardCharsets; -import java.util.logging.Level; import java.util.logging.Logger; /** * @author mortent */ -public class FileConnectionLog extends AbstractComponent implements ConnectionLog, LogWriter<ConnectionLogEntry> { +public class FileConnectionLog extends AbstractComponent implements ConnectionLog { private static final Logger logger = Logger.getLogger(FileConnectionLog.class.getName()); private final ConnectionLogHandler logHandler; @Inject public FileConnectionLog(ConnectionLogConfig config) { - logHandler = new ConnectionLogHandler(config.cluster(), this); + logHandler = new ConnectionLogHandler(config.cluster(), new JsonConnectionLogWriter()); } @Override @@ -34,9 +30,4 @@ public class FileConnectionLog extends AbstractComponent implements ConnectionLo logHandler.shutdown(); } - @Override - // TODO serialize directly to outputstream - public void write(ConnectionLogEntry entry, OutputStream outputStream) throws IOException { - outputStream.write(entry.toJson().getBytes(StandardCharsets.UTF_8)); - } }
\ No newline at end of file diff --git a/jdisc_http_service/src/main/java/com/yahoo/container/logging/FormatUtil.java b/jdisc_http_service/src/main/java/com/yahoo/container/logging/FormatUtil.java new file mode 100644 index 00000000000..ee780ad2a83 --- /dev/null +++ b/jdisc_http_service/src/main/java/com/yahoo/container/logging/FormatUtil.java @@ -0,0 +1,46 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.container.logging; + +import com.fasterxml.jackson.core.JsonGenerator; + +import java.io.IOException; +import java.time.Duration; +import java.time.Instant; + +/** + * @author bjorncs + */ +class FormatUtil { + + private FormatUtil() {} + + static void writeSecondsField(JsonGenerator generator, String fieldName, Instant instant) throws IOException { + writeSecondsField(generator, fieldName, instant.toEpochMilli()); + } + + static void writeSecondsField(JsonGenerator generator, String fieldName, Duration duration) throws IOException { + writeSecondsField(generator, fieldName, duration.toMillis()); + } + + static void writeSecondsField(JsonGenerator generator, String fieldName, double seconds) throws IOException { + writeSecondsField(generator, fieldName, (long)(seconds * 1000)); + } + + static void writeSecondsField(JsonGenerator generator, String fieldName, long milliseconds) throws IOException { + generator.writeFieldName(fieldName); + generator.writeRawValue(toSecondsString(milliseconds)); + } + + /** @return a string with number of seconds with 3 decimals */ + static String toSecondsString(long milliseconds) { + StringBuilder builder = new StringBuilder().append(milliseconds / 1000L).append('.'); + long decimals = milliseconds % 1000; + if (decimals < 100) { + builder.append('0'); + if (decimals < 10) { + builder.append('0'); + } + } + return builder.append(decimals).toString(); + } +} diff --git a/jdisc_http_service/src/main/java/com/yahoo/container/logging/JSONFormatter.java b/jdisc_http_service/src/main/java/com/yahoo/container/logging/JSONFormatter.java index c6d177684ac..441e139bc67 100644 --- a/jdisc_http_service/src/main/java/com/yahoo/container/logging/JSONFormatter.java +++ b/jdisc_http_service/src/main/java/com/yahoo/container/logging/JSONFormatter.java @@ -15,6 +15,8 @@ import java.util.Objects; import java.util.logging.Level; import java.util.logging.Logger; +import static com.yahoo.container.logging.FormatUtil.writeSecondsField; + /** * Formatting of an {@link AccessLogEntry} in the Vespa JSON access log format. * @@ -45,8 +47,8 @@ public class JSONFormatter implements LogWriter<RequestLogEntry> { String peerAddress = entry.peerAddress().get(); generator.writeStringField("ip", peerAddress); long time = entry.timestamp().get().toEpochMilli(); - writeSeconds(generator, "time", time); - writeSeconds(generator, "duration", entry.duration().get().toMillis()); + FormatUtil.writeSecondsField(generator, "time", time); + FormatUtil.writeSecondsField(generator, "duration", entry.duration().get()); generator.writeNumberField("responsesize", entry.contentSize().orElse(0)); generator.writeNumberField("code", entry.statusCode().orElse(0)); generator.writeStringField("method", entry.httpMethod().orElse("")); @@ -185,23 +187,4 @@ public class JSONFormatter implements LogWriter<RequestLogEntry> { if (rawPath == null) return null; return rawQuery != null ? rawPath + "?" + rawQuery : rawPath; } - - private static void writeSeconds(JsonGenerator generator, String fieldName, long milliseconds) throws IOException { - generator.writeFieldName(fieldName); - generator.writeRawValue(toSecondsString(milliseconds)); - } - - /** @return a string with number of seconds with 3 decimals */ - private static String toSecondsString(long milliseconds) { - StringBuilder builder = new StringBuilder().append(milliseconds / 1000L).append('.'); - long decimals = milliseconds % 1000; - if (decimals < 100) { - builder.append('0'); - if (decimals < 10) { - builder.append('0'); - } - } - return builder.append(decimals).toString(); - } - } diff --git a/jdisc_http_service/src/main/java/com/yahoo/container/logging/JsonConnectionLogWriter.java b/jdisc_http_service/src/main/java/com/yahoo/container/logging/JsonConnectionLogWriter.java new file mode 100644 index 00000000000..394f87c07cc --- /dev/null +++ b/jdisc_http_service/src/main/java/com/yahoo/container/logging/JsonConnectionLogWriter.java @@ -0,0 +1,117 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.container.logging; + +import com.fasterxml.jackson.core.JsonEncoding; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; +import java.io.OutputStream; +import java.time.Instant; +import java.util.Arrays; +import java.util.Objects; +import java.util.Optional; + +/** + * @author bjorncs + */ +class JsonConnectionLogWriter implements LogWriter<ConnectionLogEntry> { + + private final JsonFactory jsonFactory = new JsonFactory(new ObjectMapper()); + + @Override + public void write(ConnectionLogEntry record, OutputStream outputStream) throws IOException { + try (JsonGenerator generator = createJsonGenerator(outputStream)) { + generator.writeStartObject(); + generator.writeStringField("id", record.id()); + generator.writeStringField("timestamp", record.timestamp().toString()); + + writeOptionalSeconds(generator, "duration", unwrap(record.durationSeconds())); + writeOptionalString(generator, "peerAddress", unwrap(record.peerAddress())); + writeOptionalInteger(generator, "peerPort", unwrap(record.peerPort())); + writeOptionalString(generator, "localAddress", unwrap(record.localAddress())); + writeOptionalInteger(generator, "localPort", unwrap(record.localPort())); + writeOptionalString(generator, "remoteAddress", unwrap(record.remoteAddress())); + writeOptionalInteger(generator, "remotePort", unwrap(record.remotePort())); + writeOptionalLong(generator, "httpBytesReceived", unwrap(record.httpBytesReceived())); + writeOptionalLong(generator, "httpBytesSent", unwrap(record.httpBytesSent())); + writeOptionalLong(generator, "requests", unwrap(record.requests())); + writeOptionalLong(generator, "responses", unwrap(record.responses())); + + String sslProtocol = unwrap(record.sslProtocol()); + String sslSessionId = unwrap(record.sslSessionId()); + String sslCipherSuite = unwrap(record.sslCipherSuite()); + String sslPeerSubject = unwrap(record.sslPeerSubject()); + Instant sslPeerNotBefore = unwrap(record.sslPeerNotBefore()); + Instant sslPeerNotAfter = unwrap(record.sslPeerNotAfter()); + String sslSniServerName = unwrap(record.sslSniServerName()); + String sslHandshakeFailureException = unwrap(record.sslHandshakeFailureException()); + String sslHandshakeFailureMessage = unwrap(record.sslHandshakeFailureMessage()); + String sslHandshakeFailureType = unwrap(record.sslHandshakeFailureType()); + + if (isAnyValuePresent( + sslProtocol, sslSessionId, sslCipherSuite, sslPeerSubject, sslPeerNotBefore, sslPeerNotAfter, + sslSniServerName, sslHandshakeFailureException, sslHandshakeFailureMessage, sslHandshakeFailureType)) { + generator.writeObjectFieldStart("ssl"); + + writeOptionalString(generator, "protocol", sslProtocol); + writeOptionalString(generator, "sessionId", sslSessionId); + writeOptionalString(generator, "cipherSuite", sslCipherSuite); + writeOptionalString(generator, "peerSubject", sslPeerSubject); + writeOptionalTimestamp(generator, "peerNotBefore", sslPeerNotBefore); + writeOptionalTimestamp(generator, "peerNotAfter", sslPeerNotAfter); + writeOptionalString(generator, "sniServerName", sslSniServerName); + + if (isAnyValuePresent(sslHandshakeFailureException, sslHandshakeFailureMessage, sslHandshakeFailureType)) { + generator.writeObjectFieldStart("handshake-failure"); + writeOptionalString(generator, "exception", sslHandshakeFailureException); + writeOptionalString(generator, "message", sslHandshakeFailureMessage); + writeOptionalString(generator, "type", sslHandshakeFailureType); + generator.writeEndObject(); + } + + generator.writeEndObject(); + } + } + } + + private void writeOptionalString(JsonGenerator generator, String name, String value) throws IOException { + if (value != null) { + generator.writeStringField(name, value); + } + } + + private void writeOptionalInteger(JsonGenerator generator, String name, Integer value) throws IOException { + if (value != null) { + generator.writeNumberField(name, value); + } + } + + private void writeOptionalLong(JsonGenerator generator, String name, Long value) throws IOException { + if (value != null) { + generator.writeNumberField(name, value); + } + } + + private void writeOptionalTimestamp(JsonGenerator generator, String name, Instant value) throws IOException { + if (value != null) { + generator.writeStringField(name, value.toString()); + } + } + + private void writeOptionalSeconds(JsonGenerator generator, String name, Double value) throws IOException { + if (value != null) { + FormatUtil.writeSecondsField(generator, name, value); + } + } + + private static boolean isAnyValuePresent(Object... values) { return Arrays.stream(values).anyMatch(Objects::nonNull); } + private static <T> T unwrap(Optional<T> maybeValue) { return maybeValue.orElse(null); } + + private JsonGenerator createJsonGenerator(OutputStream outputStream) throws IOException { + return jsonFactory.createGenerator(outputStream, JsonEncoding.UTF8) + .configure(JsonGenerator.Feature.AUTO_CLOSE_TARGET, false) + .configure(JsonGenerator.Feature.FLUSH_PASSED_TO_STREAM, false); + } +} diff --git a/jdisc_http_service/src/main/java/com/yahoo/container/logging/LogFileHandler.java b/jdisc_http_service/src/main/java/com/yahoo/container/logging/LogFileHandler.java index d0f31a6b866..bfb51d21c6c 100644 --- a/jdisc_http_service/src/main/java/com/yahoo/container/logging/LogFileHandler.java +++ b/jdisc_http_service/src/main/java/com/yahoo/container/logging/LogFileHandler.java @@ -5,7 +5,6 @@ import com.yahoo.compress.ZstdOuputStream; import com.yahoo.concurrent.ThreadFactoryFactory; import com.yahoo.io.NativeIO; import com.yahoo.log.LogFileDb; -import com.yahoo.protect.Process; import com.yahoo.system.ProcessExecuter; import com.yahoo.yolean.Exceptions; @@ -19,13 +18,11 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; -import java.util.Optional; import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; import java.util.logging.Logger; import java.util.zip.GZIPOutputStream; @@ -36,15 +33,74 @@ import java.util.zip.GZIPOutputStream; * @author Bob Travis * @author bjorncs */ -class LogFileHandler <LOGTYPE> { +class LogFileHandler <LOGTYPE> { - enum Compression {NONE, GZIP, ZSTD} + enum Compression { NONE, GZIP, ZSTD } private final static Logger logger = Logger.getLogger(LogFileHandler.class.getName()); - private final ArrayBlockingQueue<Operation<LOGTYPE>> logQueue = new ArrayBlockingQueue<>(10000); - final LogThread<LOGTYPE> logThread; - @FunctionalInterface private interface Pollable<T> { Operation<T> poll() throws InterruptedException; } + private final Compression compression; + private final long[] rotationTimes; + private final String filePattern; // default to current directory, ms time stamp + private final String symlinkName; + private final ArrayBlockingQueue<LOGTYPE> logQueue = new ArrayBlockingQueue<>(100000); + private final AtomicBoolean rotate = new AtomicBoolean(false); + private final ExecutorService executor = Executors.newCachedThreadPool(ThreadFactoryFactory.getDaemonThreadFactory("logfilehandler.compression")); + private final NativeIO nativeIO = new NativeIO(); + private final LogThread<LOGTYPE> logThread; + + private volatile FileOutputStream currentOutputStream = null; + private volatile long nextRotationTime = 0; + private volatile String fileName; + private volatile long lastDropPosition = 0; + + private final LogWriter<LOGTYPE> logWriter; + + static private class LogThread<LOGTYPE> extends Thread { + final LogFileHandler<LOGTYPE> logFileHandler; + long lastFlush = 0; + LogThread(LogFileHandler<LOGTYPE> logFile) { + super("Logger"); + setDaemon(true); + logFileHandler = logFile; + } + @Override + public void run() { + try { + storeLogRecords(); + } catch (InterruptedException e) { + } catch (Exception e) { + com.yahoo.protect.Process.logAndDie("Failed storing log records", e); + } + + logFileHandler.flush(); + } + + private void storeLogRecords() throws InterruptedException { + while (!isInterrupted()) { + LOGTYPE r = logFileHandler.logQueue.poll(100, TimeUnit.MILLISECONDS); + if(logFileHandler.rotate.get()) { + logFileHandler.internalRotateNow(); + lastFlush = System.nanoTime(); + logFileHandler.rotate.set(false); + } + if (r != null) { + logFileHandler.internalPublish(r); + flushIfOld(3, TimeUnit.SECONDS); + } else { + flushIfOld(100, TimeUnit.MILLISECONDS); + } + } + } + + private void flushIfOld(long age, TimeUnit unit) { + long now = System.nanoTime(); + if (TimeUnit.NANOSECONDS.toMillis(now - lastFlush) > unit.toMillis(age)) { + logFileHandler.flush(); + lastFlush = now; + } + } + } LogFileHandler(Compression compression, String filePattern, String rotationTimes, String symlinkName, LogWriter<LOGTYPE> logWriter) { this(compression, filePattern, calcTimesMinutes(rotationTimes), symlinkName, logWriter); @@ -54,433 +110,320 @@ class LogFileHandler <LOGTYPE> { Compression compression, String filePattern, long[] rotationTimes, - String symlinkName, - LogWriter<LOGTYPE> logWriter) { - this.logThread = new LogThread<LOGTYPE>(logWriter, filePattern, compression, rotationTimes, symlinkName, this::poll); + String symlinkName, LogWriter<LOGTYPE> logWriter) { + this.compression = compression; + this.filePattern = filePattern; + this.rotationTimes = rotationTimes; + this.symlinkName = (symlinkName != null && !symlinkName.isBlank()) ? symlinkName : null; + this.logWriter = logWriter; + this.logThread = new LogThread<>(this); this.logThread.start(); } - private Operation<LOGTYPE> poll() throws InterruptedException { - return logQueue.poll(100, TimeUnit.MILLISECONDS); - } - /** * Sends logrecord to file, first rotating file if needed. * * @param r logrecord to publish */ public void publish(LOGTYPE r) { - addOperation(new Operation<>(r)); - } - - public void flush() { - addOperationAndWait(new Operation<>(Operation.Type.flush)); - } - - /** - * Force file rotation now, independent of schedule. - */ - void rotateNow() { - addOperationAndWait(new Operation<>(Operation.Type.rotate)); - } - - public void close() { - addOperationAndWait(new Operation<>(Operation.Type.close)); + try { + logQueue.put(r); + } catch (InterruptedException e) { + } } - private void addOperation(Operation<LOGTYPE> op) { + public synchronized void flush() { try { - logQueue.put(op); - } catch (InterruptedException e) { + FileOutputStream currentOut = this.currentOutputStream; + if (currentOut != null) { + if (compression == Compression.GZIP) { + long newPos = currentOut.getChannel().position(); + if (newPos > lastDropPosition + 102400) { + nativeIO.dropPartialFileFromCache(currentOut.getFD(), lastDropPosition, newPos, true); + lastDropPosition = newPos; + } + } else { + currentOut.flush(); + } + } + } catch (IOException e) { + logger.warning("Failed dropping from cache : " + Exceptions.toMessageString(e)); } } - private void addOperationAndWait(Operation<LOGTYPE> op) { + public void close() { try { - logQueue.put(op); - op.countDownLatch.await(); - } catch (InterruptedException e) { + flush(); + FileOutputStream currentOut = this.currentOutputStream; + if (currentOut != null) currentOut.close(); + } catch (Exception e) { + logger.log(Level.WARNING, "Got error while closing log file", e); } } - /** - * Flushes all queued messages, interrupts the log thread in this and - * waits for it to end before returning - */ - void shutdown() { - logThread.interrupt(); + private void internalPublish(LOGTYPE r) { + // first check to see if new file needed. + // if so, use this.internalRotateNow() to do it + + long now = System.currentTimeMillis(); + if (nextRotationTime <= 0) { + nextRotationTime = getNextRotationTime(now); // lazy initialization + } + if (now > nextRotationTime || currentOutputStream == null) { + internalRotateNow(); + } try { - logThread.executor.shutdownNow(); - logThread.executor.awaitTermination(600, TimeUnit.SECONDS); - logThread.join(); - } catch (InterruptedException e) { + FileOutputStream out = this.currentOutputStream; + logWriter.write(r, out); + out.write('\n'); + } catch (IOException e) { + logger.warning("Failed writing log record: " + Exceptions.toMessageString(e)); } } /** - * Calculate rotation times array, given times in minutes, as "0 60 ..." + * Find next rotation after specified time. + * + * @param now the specified time; if zero, current time is used. + * @return the next rotation time */ - private static long[] calcTimesMinutes(String times) { - ArrayList<Long> list = new ArrayList<>(50); - int i = 0; - boolean etc = false; - - while (i < times.length()) { - if (times.charAt(i) == ' ') { - i++; - continue; - } // skip spaces - int j = i; // start of string - i = times.indexOf(' ', i); - if (i == -1) i = times.length(); - if (times.charAt(j) == '.' && times.substring(j, i).equals("...")) { // ... - etc = true; + long getNextRotationTime (long now) { + if (now <= 0) { + now = System.currentTimeMillis(); + } + long nowTod = timeOfDayMillis(now); + long next = 0; + for (long rotationTime : rotationTimes) { + if (nowTod < rotationTime) { + next = rotationTime-nowTod + now; break; } - list.add(Long.valueOf(times.substring(j, i))); } - - int size = list.size(); - long[] longtimes = new long[size]; - for (i = 0; i < size; i++) { - longtimes[i] = list.get(i) // pick up value in minutes past midnight - * 60000; // and multiply to get millis + if (next == 0) { // didn't find one -- use 1st time 'tomorrow' + next = rotationTimes[0]+lengthOfDayMillis-nowTod + now; } - if (etc) { // fill out rest of day, same as final interval - long endOfDay = 24 * 60 * 60 * 1000; - long lasttime = longtimes[size - 1]; - long interval = lasttime - longtimes[size - 2]; - long moreneeded = (endOfDay - lasttime) / interval; - if (moreneeded > 0) { - int newsize = size + (int) moreneeded; - long[] temp = new long[newsize]; - for (i = 0; i < size; i++) { - temp[i] = longtimes[i]; - } - while (size < newsize) { - lasttime += interval; - temp[size++] = lasttime; - } - longtimes = temp; + return next; + } + + void waitDrained() { + while(! logQueue.isEmpty()) { + try { + Thread.sleep(1); + } catch (InterruptedException e) { } } + flush(); + } - return longtimes; + private void checkAndCreateDir(String pathname) { + int lastSlash = pathname.lastIndexOf("/"); + if (lastSlash > -1) { + String pathExcludingFilename = pathname.substring(0, lastSlash); + File filepath = new File(pathExcludingFilename); + if (!filepath.exists()) { + filepath.mkdirs(); + } + } } /** - * Only for unit testing. Do not use. + * Force file rotation now, independent of schedule. */ - String getFileName() { - return logThread.fileName; + void rotateNow () { + rotate.set(true); } - /** - * Handle logging and file operations - */ - static class LogThread<LOGTYPE> extends Thread { - private final Pollable<LOGTYPE> operationProvider; - long lastFlush = 0; - private FileOutputStream currentOutputStream = null; - private long nextRotationTime = 0; - private final String filePattern; // default to current directory, ms time stamp - private volatile String fileName; - private long lastDropPosition = 0; - private final LogWriter<LOGTYPE> logWriter; - private final Compression compression; - private final long[] rotationTimes; - private final String symlinkName; - private final ExecutorService executor = Executors.newCachedThreadPool(ThreadFactoryFactory.getDaemonThreadFactory("logfilehandler.compression")); - private final NativeIO nativeIO = new NativeIO(); - - - LogThread(LogWriter<LOGTYPE> logWriter, - String filePattern, - Compression compression, - long[] rotationTimes, - String symlinkName, - Pollable<LOGTYPE> operationProvider) { - super("Logger"); - setDaemon(true); - this.logWriter = logWriter; - this.filePattern = filePattern; - this.compression = compression; - this.rotationTimes = rotationTimes; - this.symlinkName = (symlinkName != null && !symlinkName.isBlank()) ? symlinkName : null; - this.operationProvider = operationProvider; - } + // Throw InterruptedException upwards rather than relying on isInterrupted to stop the thread as + // isInterrupted() returns false after interruption in p.waitFor + private void internalRotateNow() { + // figure out new file name, then + // use super.setOutputStream to switch to a new file - @Override - public void run() { - try { - handleLogOperations(); - } catch (InterruptedException e) { - } catch (Exception e) { - Process.logAndDie("Failed storing log records", e); - } + String oldFileName = fileName; + long now = System.currentTimeMillis(); + fileName = LogFormatter.insertDate(filePattern, now); + flush(); - internalFlush(); + try { + checkAndCreateDir(fileName); + FileOutputStream os = new FileOutputStream(fileName, true); // append mode, for safety + currentOutputStream = os; + lastDropPosition = 0; + LogFileDb.nowLoggingTo(fileName); } - - private void handleLogOperations() throws InterruptedException { - while (!isInterrupted()) { - Operation<LOGTYPE> r = operationProvider.poll(); - if (r != null) { - if (r.type == Operation.Type.flush) { - internalFlush(); - } else if (r.type == Operation.Type.close) { - internalClose(); - } else if (r.type == Operation.Type.rotate) { - internalRotateNow(); - lastFlush = System.nanoTime(); - } else if (r.type == Operation.Type.log) { - internalPublish(r.log.get()); - flushIfOld(3, TimeUnit.SECONDS); - } - r.countDownLatch.countDown(); - } else { - flushIfOld(100, TimeUnit.MILLISECONDS); - } - } + catch (IOException e) { + throw new RuntimeException("Couldn't open log file '" + fileName + "'", e); } - private void flushIfOld(long age, TimeUnit unit) { - long now = System.nanoTime(); - if (TimeUnit.NANOSECONDS.toMillis(now - lastFlush) > unit.toMillis(age)) { - internalFlush(); - lastFlush = now; - } - } + createSymlinkToCurrentFile(); - private synchronized void internalFlush() { - try { - FileOutputStream currentOut = this.currentOutputStream; - if (currentOut != null) { - if (compression == Compression.GZIP) { - long newPos = currentOut.getChannel().position(); - if (newPos > lastDropPosition + 102400) { - nativeIO.dropPartialFileFromCache(currentOut.getFD(), lastDropPosition, newPos, true); - lastDropPosition = newPos; - } - } else { - currentOut.flush(); - } + nextRotationTime = 0; //figure it out later (lazy evaluation) + if ((oldFileName != null)) { + File oldFile = new File(oldFileName); + if (oldFile.exists()) { + if (compression != Compression.NONE) { + executor.execute(() -> runCompression(oldFile, compression)); + } else { + nativeIO.dropFileFromCache(oldFile); } - } catch (IOException e) { - logger.warning("Failed dropping from cache : " + Exceptions.toMessageString(e)); - } - } - - private void internalClose() { - try { - internalFlush(); - FileOutputStream currentOut = this.currentOutputStream; - if (currentOut != null) currentOut.close(); - } catch (Exception e) { - logger.log(Level.WARNING, "Got error while closing log file", e); - } - } - - private void internalPublish(LOGTYPE r) { - // first check to see if new file needed. - // if so, use this.internalRotateNow() to do it - - long now = System.currentTimeMillis(); - if (nextRotationTime <= 0) { - nextRotationTime = getNextRotationTime(now); // lazy initialization - } - if (now > nextRotationTime || currentOutputStream == null) { - internalRotateNow(); - } - try { - FileOutputStream out = this.currentOutputStream; - logWriter.write(r, out); - out.write('\n'); - } catch (IOException e) { - logger.warning("Failed writing log record: " + Exceptions.toMessageString(e)); } } + } - /** - * Find next rotation after specified time. - * - * @param now the specified time; if zero, current time is used. - * @return the next rotation time - */ - long getNextRotationTime(long now) { - if (now <= 0) { - now = System.currentTimeMillis(); - } - long nowTod = timeOfDayMillis(now); - long next = 0; - for (long rotationTime : rotationTimes) { - if (nowTod < rotationTime) { - next = rotationTime - nowTod + now; - break; - } - } - if (next == 0) { // didn't find one -- use 1st time 'tomorrow' - next = rotationTimes[0] + lengthOfDayMillis - nowTod + now; - } - return next; + private static void runCompression(File oldFile, Compression compression) { + switch (compression) { + case ZSTD: + runCompressionZstd(oldFile.toPath()); + break; + case GZIP: + runCompressionGzip(oldFile); + break; + default: + throw new IllegalArgumentException("Unknown compression " + compression); } + } - private void checkAndCreateDir(String pathname) { - int lastSlash = pathname.lastIndexOf("/"); - if (lastSlash > -1) { - String pathExcludingFilename = pathname.substring(0, lastSlash); - File filepath = new File(pathExcludingFilename); - if (!filepath.exists()) { - filepath.mkdirs(); + private static void runCompressionZstd(Path oldFile) { + try { + Path compressedFile = Paths.get(oldFile.toString() + ".zst"); + Files.createFile(compressedFile); + int bufferSize = 0x400000; // 4M + byte[] buffer = new byte[bufferSize]; + try (ZstdOuputStream out = new ZstdOuputStream(Files.newOutputStream(compressedFile), bufferSize); + InputStream in = Files.newInputStream(oldFile)) { + int read; + while ((read = in.read(buffer)) >= 0) { + out.write(buffer, 0, read); } + out.flush(); } + Files.delete(oldFile); + } catch (IOException e) { + logger.log(Level.WARNING, "Failed to compress log file with zstd: " + oldFile, e); } + } + private static void runCompressionGzip(File oldFile) { + File gzippedFile = new File(oldFile.getPath() + ".gz"); + try (GZIPOutputStream compressor = new GZIPOutputStream(new FileOutputStream(gzippedFile), 0x100000); + FileInputStream inputStream = new FileInputStream(oldFile)) + { + byte [] buffer = new byte[0x400000]; // 4M buffer - // Throw InterruptedException upwards rather than relying on isInterrupted to stop the thread as - // isInterrupted() returns false after interruption in p.waitFor - private void internalRotateNow() { - // figure out new file name, then - // use super.setOutputStream to switch to a new file - - String oldFileName = fileName; - long now = System.currentTimeMillis(); - fileName = LogFormatter.insertDate(filePattern, now); - internalFlush(); - - try { - checkAndCreateDir(fileName); - FileOutputStream os = new FileOutputStream(fileName, true); // append mode, for safety - currentOutputStream = os; - lastDropPosition = 0; - LogFileDb.nowLoggingTo(fileName); - } catch (IOException e) { - throw new RuntimeException("Couldn't open log file '" + fileName + "'", e); + long totalBytesRead = 0; + NativeIO nativeIO = new NativeIO(); + for (int read = inputStream.read(buffer); read > 0; read = inputStream.read(buffer)) { + compressor.write(buffer, 0, read); + nativeIO.dropPartialFileFromCache(inputStream.getFD(), totalBytesRead, read, false); + totalBytesRead += read; } + compressor.finish(); + compressor.flush(); - createSymlinkToCurrentFile(); + oldFile.delete(); + nativeIO.dropFileFromCache(gzippedFile); + } catch (IOException e) { + logger.warning("Got '" + e + "' while compressing '" + oldFile.getPath() + "'."); + } + } - nextRotationTime = 0; //figure it out later (lazy evaluation) - if ((oldFileName != null)) { - File oldFile = new File(oldFileName); - if (oldFile.exists()) { - if (compression != Compression.NONE) { - executor.execute(() -> runCompression(oldFile, compression)); - } else { - nativeIO.dropFileFromCache(oldFile); - } - } + /** Name files by date - create a symlink with a constant name to the newest file */ + private void createSymlinkToCurrentFile() { + if (symlinkName == null) return; + File f = new File(fileName); + File f2 = new File(f.getParent(), symlinkName); + String [] cmd = new String[]{"/bin/ln", "-sf", f.getName(), f2.getPath()}; + try { + int retval = new ProcessExecuter().exec(cmd).getFirst(); + // Detonator pattern: Think of all the fun we can have if ln isn't what we + // think it is, if it doesn't return, etc, etc + if (retval != 0) { + logger.warning("Command '" + Arrays.toString(cmd) + "' + failed with exitcode=" + retval); } + } catch (IOException e) { + logger.warning("Got '" + e + "' while doing'" + Arrays.toString(cmd) + "'."); } + } + /** + * Calculate rotation times array, given times in minutes, as "0 60 ..." + * + */ + private static long[] calcTimesMinutes(String times) { + ArrayList<Long> list = new ArrayList<>(50); + int i = 0; + boolean etc = false; - private static void runCompression(File oldFile, Compression compression) { - switch (compression) { - case ZSTD: - runCompressionZstd(oldFile.toPath()); - break; - case GZIP: - runCompressionGzip(oldFile); - break; - default: - throw new IllegalArgumentException("Unknown compression " + compression); + while (i < times.length()) { + if (times.charAt(i) == ' ') { i++; continue; } // skip spaces + int j = i; // start of string + i = times.indexOf(' ', i); + if (i == -1) i = times.length(); + if (times.charAt(j) == '.' && times.substring(j,i).equals("...")) { // ... + etc = true; + break; } + list.add(Long.valueOf(times.substring(j,i))); } - private static void runCompressionZstd(Path oldFile) { - try { - Path compressedFile = Paths.get(oldFile.toString() + ".zst"); - Files.createFile(compressedFile); - int bufferSize = 0x400000; // 4M - byte[] buffer = new byte[bufferSize]; - try (ZstdOuputStream out = new ZstdOuputStream(Files.newOutputStream(compressedFile), bufferSize); - InputStream in = Files.newInputStream(oldFile)) { - int read; - while ((read = in.read(buffer)) >= 0) { - out.write(buffer, 0, read); - } - out.flush(); - } - Files.delete(oldFile); - } catch (IOException e) { - logger.log(Level.WARNING, "Failed to compress log file with zstd: " + oldFile, e); - } + int size = list.size(); + long[] longtimes = new long[size]; + for (i = 0; i<size; i++) { + longtimes[i] = list.get(i) // pick up value in minutes past midnight + * 60000; // and multiply to get millis } - private static void runCompressionGzip(File oldFile) { - File gzippedFile = new File(oldFile.getPath() + ".gz"); - NativeIO nativeIO = new NativeIO(); - try (GZIPOutputStream compressor = new GZIPOutputStream(new FileOutputStream(gzippedFile), 0x100000); - FileInputStream inputStream = new FileInputStream(oldFile)) { - byte[] buffer = new byte[0x400000]; // 4M buffer - - long totalBytesRead = 0; - for (int read = inputStream.read(buffer); read > 0; read = inputStream.read(buffer)) { - compressor.write(buffer, 0, read); - nativeIO.dropPartialFileFromCache(inputStream.getFD(), totalBytesRead, read, false); - totalBytesRead += read; + if (etc) { // fill out rest of day, same as final interval + long endOfDay = 24*60*60*1000; + long lasttime = longtimes[size-1]; + long interval = lasttime - longtimes[size-2]; + long moreneeded = (endOfDay - lasttime)/interval; + if (moreneeded > 0) { + int newsize = size + (int)moreneeded; + long[] temp = new long[newsize]; + for (i=0; i<size; i++) { + temp[i] = longtimes[i]; } - compressor.finish(); - compressor.flush(); - - } catch (IOException e) { - logger.warning("Got '" + e + "' while compressing '" + oldFile.getPath() + "'."); - } - oldFile.delete(); - nativeIO.dropFileFromCache(gzippedFile); - } - - /** - * Name files by date - create a symlink with a constant name to the newest file - */ - private void createSymlinkToCurrentFile() { - if (symlinkName == null) return; - File f = new File(fileName); - File f2 = new File(f.getParent(), symlinkName); - String[] cmd = new String[]{"/bin/ln", "-sf", f.getName(), f2.getPath()}; - try { - int retval = new ProcessExecuter().exec(cmd).getFirst(); - // Detonator pattern: Think of all the fun we can have if ln isn't what we - // think it is, if it doesn't return, etc, etc - if (retval != 0) { - logger.warning("Command '" + Arrays.toString(cmd) + "' + failed with exitcode=" + retval); + while (size < newsize) { + lasttime += interval; + temp[size++] = lasttime; } - } catch (IOException e) { - logger.warning("Got '" + e + "' while doing'" + Arrays.toString(cmd) + "'."); + longtimes = temp; } } - private static final long lengthOfDayMillis = 24 * 60 * 60 * 1000; - private static long timeOfDayMillis(long time) { - return time % lengthOfDayMillis; - } - + return longtimes; } - private static class Operation<LOGTYPE> { - enum Type {log, flush, close, rotate} - - ; + // Support staff :-) + private static final long lengthOfDayMillis = 24*60*60*1000; // ? is this close enough ? - final Type type; - - final Optional<LOGTYPE> log; - final CountDownLatch countDownLatch = new CountDownLatch(1); + private static long timeOfDayMillis ( long time ) { + return time % lengthOfDayMillis; + } - Operation(Type type) { - this(type, Optional.empty()); + /** + * Flushes all queued messages, interrupts the log thread in this and + * waits for it to end before returning + */ + void shutdown() { + logThread.interrupt(); + try { + logThread.join(); + executor.shutdown(); + executor.awaitTermination(600, TimeUnit.SECONDS); } - - Operation(LOGTYPE log) { - this(Type.log, Optional.of(log)); + catch (InterruptedException e) { } + } - private Operation(Type type, Optional<LOGTYPE> log) { - this.type = type; - this.log = log; - } + /** + * Only for unit testing. Do not use. + */ + String getFileName() { + return fileName; } -} +} diff --git a/jdisc_http_service/src/test/java/com/yahoo/container/logging/ConnectionLogEntryTest.java b/jdisc_http_service/src/test/java/com/yahoo/container/logging/ConnectionLogEntryTest.java deleted file mode 100644 index fbf9bd1dc23..00000000000 --- a/jdisc_http_service/src/test/java/com/yahoo/container/logging/ConnectionLogEntryTest.java +++ /dev/null @@ -1,32 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -package com.yahoo.container.logging; - -import org.junit.Assert; -import org.junit.Test; - -import java.io.IOException; -import java.time.Instant; -import java.util.UUID; - -/** - * @author mortent - */ -public class ConnectionLogEntryTest { - - @Test - public void test_serialization () throws IOException { - var id = UUID.randomUUID(); - var instant = Instant.parse("2021-01-13T12:12:12Z"); - ConnectionLogEntry entry = ConnectionLogEntry.builder(id, instant) - .withPeerPort(1234) - .build(); - - String expected = "{" + - "\"id\":\""+id.toString()+"\"," + - "\"timestamp\":\"2021-01-13T12:12:12Z\"," + - "\"peerPort\":1234" + - "}"; - Assert.assertEquals(expected, entry.toJson()); - } -} diff --git a/jdisc_http_service/src/test/java/com/yahoo/container/logging/JsonConnectionLogWriterTest.java b/jdisc_http_service/src/test/java/com/yahoo/container/logging/JsonConnectionLogWriterTest.java new file mode 100644 index 00000000000..b8978fe489c --- /dev/null +++ b/jdisc_http_service/src/test/java/com/yahoo/container/logging/JsonConnectionLogWriterTest.java @@ -0,0 +1,36 @@ +package com.yahoo.container.logging;// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +import com.yahoo.test.json.JsonTestHelper; +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.util.UUID; + +/** + * @author bjorncs + */ +class JsonConnectionLogWriterTest { + + @Test + void test_serialization() throws IOException { + var id = UUID.randomUUID(); + var instant = Instant.parse("2021-01-13T12:12:12Z"); + ConnectionLogEntry entry = ConnectionLogEntry.builder(id, instant) + .withPeerPort(1234) + .build(); + String expectedJson = "{" + + "\"id\":\""+id.toString()+"\"," + + "\"timestamp\":\"2021-01-13T12:12:12Z\"," + + "\"peerPort\":1234" + + "}"; + + JsonConnectionLogWriter writer = new JsonConnectionLogWriter(); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + writer.write(entry, out); + String actualJson = out.toString(StandardCharsets.UTF_8); + JsonTestHelper.assertJsonEquals(actualJson, expectedJson); + } +}
\ No newline at end of file diff --git a/jdisc_http_service/src/test/java/com/yahoo/container/logging/LogFileHandlerTestCase.java b/jdisc_http_service/src/test/java/com/yahoo/container/logging/LogFileHandlerTestCase.java index cd3c174a12e..f76312af61e 100644 --- a/jdisc_http_service/src/test/java/com/yahoo/container/logging/LogFileHandlerTestCase.java +++ b/jdisc_http_service/src/test/java/com/yahoo/container/logging/LogFileHandlerTestCase.java @@ -42,14 +42,20 @@ public class LogFileHandlerTestCase { String pattern = root.getAbsolutePath() + "/logfilehandlertest.%Y%m%d%H%M%S"; long[] rTimes = {1000, 2000, 10000}; + Formatter formatter = new Formatter() { + public String format(LogRecord r) { + DateFormat df = new SimpleDateFormat("yyyy.MM.dd:HH:mm:ss.SSS"); + String timeStamp = df.format(new Date(r.getMillis())); + return ("["+timeStamp+"]" + " " + formatMessage(r) + "\n"); + } + }; LogFileHandler<String> h = new LogFileHandler<>(Compression.NONE, pattern, rTimes, null, new StringLogWriter()); long now = System.currentTimeMillis(); long millisPerDay = 60*60*24*1000; long tomorrowDays = (now / millisPerDay) +1; long tomorrowMillis = tomorrowDays * millisPerDay; - - assertThat(tomorrowMillis+1000).isEqualTo(h.logThread.getNextRotationTime(tomorrowMillis)); - assertThat(tomorrowMillis+10000).isEqualTo(h.logThread.getNextRotationTime(tomorrowMillis+3000)); + assertThat(tomorrowMillis+1000).isEqualTo(h.getNextRotationTime(tomorrowMillis)); + assertThat(tomorrowMillis+10000).isEqualTo(h.getNextRotationTime(tomorrowMillis+3000)); String message = "test"; h.publish(message); h.publish( "another test"); @@ -121,7 +127,7 @@ public class LogFileHandlerTestCase { String longMessage = formatter.format(new LogRecord(Level.INFO, "string which is way longer than the word test")); handler.publish(longMessage); - handler.flush(); + handler.waitDrained(); assertThat(Files.size(Paths.get(firstFile))).isEqualTo(31); final long expectedSecondFileLength = 72; long secondFileLength; @@ -166,7 +172,7 @@ public class LogFileHandlerTestCase { for (int i = 0; i < logEntries; i++) { h.publish("test"); } - h.flush(); + h.waitDrained(); String f1 = h.getFileName(); assertThat(f1).startsWith(root.getAbsolutePath() + "/logfilehandlertest."); File uncompressed = new File(f1); |