diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-02-17 17:29:51 +0100 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2021-02-17 17:39:12 +0100 |
commit | 6856c7448dcdae692f60aa65a79ec34c984b0586 (patch) | |
tree | 70305bdd01a82e8fcf6d3bd984ce2bb59cf50ea4 /container-core | |
parent | 1e4291ddeeb9072b49e64ef962f61a28dd256b9b (diff) |
Track sent and acked bytes to ensure a smooth flow of bytes not exceeding maxpending by more than
than a small byte buffer.
Diffstat (limited to 'container-core')
4 files changed, 61 insertions, 23 deletions
diff --git a/container-core/src/main/java/com/yahoo/container/handler/LogHandler.java b/container-core/src/main/java/com/yahoo/container/handler/LogHandler.java index 4b23eafaa9c..ad03f8405ac 100644 --- a/container-core/src/main/java/com/yahoo/container/handler/LogHandler.java +++ b/container-core/src/main/java/com/yahoo/container/handler/LogHandler.java @@ -15,13 +15,14 @@ import java.io.OutputStream; import java.nio.ByteBuffer; import java.time.Instant; import java.util.Optional; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; public class LogHandler extends ThreadedHttpRequestHandler { private final LogReader logReader; + private static final long MB = 1024*1024; @Inject public LogHandler(Executor executor, LogHandlerConfig config) { @@ -45,7 +46,7 @@ public class LogHandler extends ThreadedHttpRequestHandler { @Override public void render(OutputStream output, ContentChannel networkChannel, CompletionHandler handler) { try { - OutputStream blockingOutput = new BlockingFlushContentChannelOutputStream(networkChannel); + OutputStream blockingOutput = new MaxPendingContentChannelOutputStream(networkChannel, 1*MB); logReader.writeLogs(blockingOutput, from, to, hostname); blockingOutput.close(); } @@ -60,26 +61,57 @@ public class LogHandler extends ThreadedHttpRequestHandler { } - private static class BlockingFlushContentChannelOutputStream extends ContentChannelOutputStream { - + private static class MaxPendingContentChannelOutputStream extends ContentChannelOutputStream { private final ContentChannel channel; + private final long maxPending; + private AtomicLong sent = new AtomicLong(0); + private AtomicLong acked = new AtomicLong(0); - public BlockingFlushContentChannelOutputStream(ContentChannel endpoint) { + public MaxPendingContentChannelOutputStream(ContentChannel endpoint, long maxPending) { super(endpoint); this.channel = endpoint; + this.maxPending = maxPending; + } + + private long pendingBytes() { + return sent.get() - acked.get(); + } + + private class TrackCompletition implements CompletionHandler { + final long written; + TrackCompletition(long written) { + this.written = written; + sent.addAndGet(written); + } + @Override + public void completed() { + acked.addAndGet(written); + } + + @Override + public void failed(Throwable t) { + acked.addAndGet(written); + } + } + @Override + public void send(ByteBuffer src) throws IOException { + try { + stallWhilePendingAbove(maxPending); + } catch (InterruptedException e) {} + send(src, new TrackCompletition(src.remaining())); + } + + private void stallWhilePendingAbove(long pending) throws InterruptedException { + while (pendingBytes() > pending) { + Thread.sleep(1); + } } @Override public void flush() throws IOException { super.flush(); - CountDownLatch latch = new CountDownLatch(1); - channel.write(ByteBuffer.allocate(0), // :'( - new CompletionHandler() { - @Override public void completed() { latch.countDown(); } - @Override public void failed(Throwable t) { latch.countDown(); } - }); try { - latch.await(); + stallWhilePendingAbove(0); } catch (InterruptedException e) { throw new RuntimeException("Interrupted waiting for underlying IO to complete", e); diff --git a/container-core/src/main/java/com/yahoo/container/handler/LogReader.java b/container-core/src/main/java/com/yahoo/container/handler/LogReader.java index a7fcb8a9f71..af83f97fd9b 100644 --- a/container-core/src/main/java/com/yahoo/container/handler/LogReader.java +++ b/container-core/src/main/java/com/yahoo/container/handler/LogReader.java @@ -72,16 +72,10 @@ class LogReader { Iterator<LineWithTimestamp> lines = Iterators.mergeSorted(logLineIterators, Comparator.comparingDouble(LineWithTimestamp::timestamp)); - long charsWritten = 0; while (lines.hasNext()) { String line = lines.next().line(); writer.write(line); writer.newLine(); - charsWritten += line.length(); - if (charsWritten > 0x100000) { - writer.flush(); - charsWritten = 0; - } } } catch (IOException e) { diff --git a/container-core/src/main/java/com/yahoo/container/jdisc/ContentChannelOutputStream.java b/container-core/src/main/java/com/yahoo/container/jdisc/ContentChannelOutputStream.java index 1d4c20efe5e..270da0c4ab0 100644 --- a/container-core/src/main/java/com/yahoo/container/jdisc/ContentChannelOutputStream.java +++ b/container-core/src/main/java/com/yahoo/container/jdisc/ContentChannelOutputStream.java @@ -12,7 +12,6 @@ import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.Arrays; -import java.util.logging.Level; import java.util.logging.Logger; /** @@ -125,9 +124,13 @@ public class ContentChannelOutputStream extends OutputStream implements Writable @Override public void send(ByteBuffer src) throws IOException { // Don't do a buffer.flush() from here, this method is used by the buffer itself + send(src, null); + } + + protected void send(ByteBuffer src, CompletionHandler completionHandler) throws IOException { try { byteBufferData += src.remaining(); - endpoint.write(src, new LoggingCompletionHandler()); + endpoint.write(src, new LoggingCompletionHandler(completionHandler)); } catch (RuntimeException e) { throw new IOException(Exceptions.toMessageString(e), e); } @@ -138,10 +141,16 @@ public class ContentChannelOutputStream extends OutputStream implements Writable return buffer.appended() + byteBufferData; } - class LoggingCompletionHandler implements CompletionHandler { - + private class LoggingCompletionHandler implements CompletionHandler { + private final CompletionHandler nested; + LoggingCompletionHandler(CompletionHandler nested) { + this.nested = nested; + } @Override public void completed() { + if (nested != null) { + nested.completed(); + } } @Override @@ -158,6 +167,9 @@ public class ContentChannelOutputStream extends OutputStream implements Writable if (log.isLoggable(logLevel)) { log.log(logLevel, "Got exception when writing to client: " + Exceptions.toMessageString(t)); } + if (nested != null) { + nested.failed(t); + } } } diff --git a/container-core/src/test/java/com/yahoo/container/jdisc/LoggingTestCase.java b/container-core/src/test/java/com/yahoo/container/jdisc/LoggingTestCase.java index 22e55eb5291..bdaf6f7919f 100644 --- a/container-core/src/test/java/com/yahoo/container/jdisc/LoggingTestCase.java +++ b/container-core/src/test/java/com/yahoo/container/jdisc/LoggingTestCase.java @@ -96,7 +96,7 @@ public class LoggingTestCase { } @Test - public final void testFailed() throws IOException, InterruptedException { + public final void testFailed() throws IOException { stream.send(createData()); stream.send(createData()); stream.send(createData()); |