From dfa4778dede2e0823586a0bb16c1dcc290a4f93f Mon Sep 17 00:00:00 2001 From: Jon Marius Venstad Date: Mon, 15 Jun 2020 13:38:17 +0200 Subject: Let Jetty do compression, and merge overlapping time windows --- .../com/yahoo/container/handler/LogHandler.java | 3 - .../com/yahoo/container/handler/LogReader.java | 139 ++++++++++++++------- .../yahoo/container/handler/LogHandlerTest.java | 6 +- .../com/yahoo/container/handler/LogReaderTest.java | 12 +- 4 files changed, 100 insertions(+), 60 deletions(-) diff --git a/container-core/src/main/java/com/yahoo/container/handler/LogHandler.java b/container-core/src/main/java/com/yahoo/container/handler/LogHandler.java index 0b42b3a481b..1d6e1a0893d 100644 --- a/container-core/src/main/java/com/yahoo/container/handler/LogHandler.java +++ b/container-core/src/main/java/com/yahoo/container/handler/LogHandler.java @@ -35,9 +35,6 @@ public class LogHandler extends ThreadedHttpRequestHandler { .map(Long::valueOf).map(Instant::ofEpochMilli).orElse(Instant.MAX); return new HttpResponse(200) { - { - headers().add("Content-Encoding", "gzip"); - } @Override public void render(OutputStream outputStream) { logReader.writeLogs(outputStream, from, to); diff --git a/container-core/src/main/java/com/yahoo/container/handler/LogReader.java b/container-core/src/main/java/com/yahoo/container/handler/LogReader.java index 09330542bea..1324a37f357 100644 --- a/container-core/src/main/java/com/yahoo/container/handler/LogReader.java +++ b/container-core/src/main/java/com/yahoo/container/handler/LogReader.java @@ -1,12 +1,12 @@ // Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.container.handler; +import com.google.common.collect.Iterators; import com.yahoo.vespa.defaults.Defaults; +import com.yahoo.yolean.Exceptions; import java.io.BufferedReader; import java.io.BufferedWriter; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; @@ -23,15 +23,15 @@ import java.time.Duration; import java.time.Instant; import java.time.ZoneId; import java.time.ZonedDateTime; -import java.time.temporal.ChronoUnit; import java.util.ArrayList; +import java.util.Comparator; +import java.util.Iterator; import java.util.List; import java.util.TreeMap; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.zip.GZIPInputStream; -import java.util.zip.GZIPOutputStream; import static java.nio.charset.StandardCharsets.UTF_8; @@ -57,53 +57,102 @@ class LogReader { this.logFilePattern = logFilePattern; } - void writeLogs(OutputStream outputStream, Instant from, Instant to) { + void writeLogs(OutputStream out, Instant from, Instant to) { + double fromSeconds = from.getEpochSecond() + from.getNano() / 1e9; + double toSeconds = to.getEpochSecond() + to.getNano() / 1e9; + BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out)); try { - List> logs = getMatchingFiles(from, to); - for (int i = 0; i < logs.size(); i++) { - for (Path log : logs.get(i)) { - boolean zipped = log.toString().endsWith(".gz"); - try (InputStream in = Files.newInputStream(log)) { - InputStream inProxy; - - // If the log needs filtering, possibly unzip (and rezip) it, and filter its lines on timestamp. - // When multiple log files exist for the same instant, their entries should ideally be sorted. This is not done here. - if (i == 0 || i == logs.size() - 1) { - ByteArrayOutputStream buffer = new ByteArrayOutputStream(); - try (BufferedReader reader = new BufferedReader(new InputStreamReader(zipped ? new GZIPInputStream(in) : in, UTF_8)); - BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(zipped ? new GZIPOutputStream(buffer) : buffer, UTF_8))) { - for (String line; (line = reader.readLine()) != null; ) { - String[] parts = line.split("\t"); - if (parts.length != 7) - continue; - - Instant at = Instant.EPOCH.plus((long) (Double.parseDouble(parts[0]) * 1_000_000), ChronoUnit.MICROS); - if (at.isAfter(from) && !at.isAfter(to)) { - writer.write(line); - writer.newLine(); - } - } - } - inProxy = new ByteArrayInputStream(buffer.toByteArray()); - } - else - inProxy = in; - - if ( ! zipped) { - ByteArrayOutputStream buffer = new ByteArrayOutputStream(); - try (OutputStream outProxy = new GZIPOutputStream(buffer)) { - inProxy.transferTo(outProxy); - } - inProxy = new ByteArrayInputStream(buffer.toByteArray()); - } - inProxy.transferTo(outputStream); + for (List logs : getMatchingFiles(from, to)) { + List logLineIterators = new ArrayList<>(); + try { + // Logs in each sub-list contain entries covering the same time interval, so do a merge sort while reading + for (Path log : logs) + logLineIterators.add(new LogLineIterator(log, fromSeconds, toSeconds)); + + Iterator lines = Iterators.mergeSorted(logLineIterators, + Comparator.comparingDouble(LineWithTimestamp::timestamp)); + while (lines.hasNext()) { + writer.write(lines.next().line()); + writer.newLine(); + } + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + finally { + for (LogLineIterator ll : logLineIterators) { + try { ll.close(); } catch (IOException ignored) { } } } } } - catch (IOException e) { - throw new UncheckedIOException(e); + finally { + Exceptions.uncheck(writer::flush); + } + } + + private static class LogLineIterator implements Iterator, AutoCloseable { + + private final BufferedReader reader; + private final double from; + private final double to; + private LineWithTimestamp next; + + private LogLineIterator(Path log, double from, double to) throws IOException { + boolean zipped = log.toString().endsWith(".gz"); + InputStream in = Files.newInputStream(log); + this.reader = new BufferedReader(new InputStreamReader(zipped ? new GZIPInputStream(in) : in, UTF_8)); + this.from = from; + this.to = to; + this.next = readNext(); + } + + @Override + public boolean hasNext() { + return next != null; + } + + @Override + public LineWithTimestamp next() { + LineWithTimestamp current = next; + next = readNext(); + return current; + } + + @Override + public void close() throws IOException { + reader.close(); + } + + private LineWithTimestamp readNext() { + try { + for (String line; (line = reader.readLine()) != null; ) { + String[] parts = line.split("\t"); + if (parts.length != 7) + continue; + + double timestamp = Double.parseDouble(parts[0]); + if (timestamp > from && timestamp <= to) + return new LineWithTimestamp(line, timestamp); + } + return null; + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + } + + private static class LineWithTimestamp { + final String line; + final double timestamp; + LineWithTimestamp(String line, double timestamp) { + this.line = line; + this.timestamp = timestamp; } + String line() { return line; } + double timestamp() { return timestamp; } } /** Returns log files which may have relevant entries, grouped and sorted by {@link #extractTimestamp(Path)} — the first and last group must be filtered. */ diff --git a/container-core/src/test/java/com/yahoo/container/handler/LogHandlerTest.java b/container-core/src/test/java/com/yahoo/container/handler/LogHandlerTest.java index 01dcb885a97..ab0d0d54675 100644 --- a/container-core/src/test/java/com/yahoo/container/handler/LogHandlerTest.java +++ b/container-core/src/test/java/com/yahoo/container/handler/LogHandlerTest.java @@ -47,12 +47,12 @@ public class LogHandlerTest { } @Override - protected void writeLogs(OutputStream outputStream, Instant from, Instant to) { + protected void writeLogs(OutputStream out, Instant from, Instant to) { try { if (to.isAfter(Instant.ofEpochMilli(1000))) { - outputStream.write("newer log".getBytes()); + out.write("newer log".getBytes()); } else { - outputStream.write("older log".getBytes()); + out.write("older log".getBytes()); } } catch (Exception e) {} } diff --git a/container-core/src/test/java/com/yahoo/container/handler/LogReaderTest.java b/container-core/src/test/java/com/yahoo/container/handler/LogReaderTest.java index 6b7be1d8cd3..3f7a78e13be 100644 --- a/container-core/src/test/java/com/yahoo/container/handler/LogReaderTest.java +++ b/container-core/src/test/java/com/yahoo/container/handler/LogReaderTest.java @@ -57,7 +57,7 @@ public class LogReaderTest { LogReader logReader = new LogReader(logDirectory, Pattern.compile(".*")); logReader.writeLogs(baos, Instant.ofEpochMilli(150), Instant.ofEpochMilli(3601050)); - assertEquals(log100 + logv11 + log110, decompress(baos.toByteArray())); + assertEquals(log100 + logv11 + log110, baos.toString(UTF_8)); } @Test @@ -66,7 +66,7 @@ public class LogReaderTest { LogReader logReader = new LogReader(logDirectory, Pattern.compile(".*-1.*")); logReader.writeLogs(baos, Instant.EPOCH, Instant.EPOCH.plus(Duration.ofDays(2))); - assertEquals(log101 + logv11, decompress(baos.toByteArray())); + assertEquals(log101 + logv11, baos.toString(UTF_8)); } @Test @@ -75,7 +75,7 @@ public class LogReaderTest { LogReader logReader = new LogReader(logDirectory, Pattern.compile(".*")); logReader.writeLogs(zippedBaos, Instant.EPOCH, Instant.EPOCH.plus(Duration.ofDays(2))); - assertEquals(log100 + log101 + logv11 + log110 + log200 + logv, decompress(zippedBaos.toByteArray())); + assertEquals(log101 + log100 + logv11 + log110 + log200 + logv, zippedBaos.toString(UTF_8)); } private byte[] compress(String input) throws IOException { @@ -86,10 +86,4 @@ public class LogReaderTest { return baos.toByteArray(); } - private String decompress(byte[] input) throws IOException { - if (input.length == 0) return ""; - byte[] decompressed = new GZIPInputStream(new ByteArrayInputStream(input)).readAllBytes(); - return new String(decompressed); - } - } -- cgit v1.2.3 From 32cbecbee0499e988c755f45919ef8f1bfb0e031 Mon Sep 17 00:00:00 2001 From: Jon Marius Venstad Date: Mon, 15 Jun 2020 14:35:20 +0200 Subject: Return immediately at end of time window --- .../src/main/java/com/yahoo/container/handler/LogReader.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/container-core/src/main/java/com/yahoo/container/handler/LogReader.java b/container-core/src/main/java/com/yahoo/container/handler/LogReader.java index 1324a37f357..3cf849a6835 100644 --- a/container-core/src/main/java/com/yahoo/container/handler/LogReader.java +++ b/container-core/src/main/java/com/yahoo/container/handler/LogReader.java @@ -132,7 +132,10 @@ class LogReader { continue; double timestamp = Double.parseDouble(parts[0]); - if (timestamp > from && timestamp <= to) + if (timestamp > to) + return null; + + if (timestamp >= from) return new LineWithTimestamp(line, timestamp); } return null; -- cgit v1.2.3