diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2021-04-14 16:28:11 +0200 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2021-04-14 16:28:11 +0200 |
commit | f62d7b095c353508103f0f038e1f957435d54a38 (patch) | |
tree | 11ae6f185ae78179011bec39b6d5fdfe889c1dc6 /container-core/src/main/java/com/yahoo/container/handler/LogHandler.java | |
parent | fd9b726786f4c00b276f2d84fd0a3593a0c406eb (diff) |
Move MaxPendingContentChannelStream to container-core and use it thorugh max pending on HttpResponse
Diffstat (limited to 'container-core/src/main/java/com/yahoo/container/handler/LogHandler.java')
-rw-r--r-- | container-core/src/main/java/com/yahoo/container/handler/LogHandler.java | 80 |
1 files changed, 6 insertions, 74 deletions
diff --git a/container-core/src/main/java/com/yahoo/container/handler/LogHandler.java b/container-core/src/main/java/com/yahoo/container/handler/LogHandler.java index 991cd83ffa8..f1ba68ff3c8 100644 --- a/container-core/src/main/java/com/yahoo/container/handler/LogHandler.java +++ b/container-core/src/main/java/com/yahoo/container/handler/LogHandler.java @@ -11,6 +11,7 @@ import com.yahoo.jdisc.handler.CompletionHandler; import com.yahoo.jdisc.handler.ContentChannel; import java.io.IOException; +import java.io.InterruptedIOException; import java.io.OutputStream; import java.nio.ByteBuffer; import java.time.Instant; @@ -23,7 +24,7 @@ import java.util.logging.Level; public class LogHandler extends ThreadedHttpRequestHandler { private final LogReader logReader; - private static final long MB = 1024*1024; + private static final long MB = 1024 * 1024; @Inject public LogHandler(Executor executor, LogHandlerConfig config) { @@ -45,11 +46,11 @@ public class LogHandler extends ThreadedHttpRequestHandler { return new AsyncHttpResponse(200) { @Override + public long maxPendingBytes() { return MB; } + @Override public void render(OutputStream output, ContentChannel networkChannel, CompletionHandler handler) { - try { - OutputStream blockingOutput = new MaxPendingContentChannelOutputStream(networkChannel, 1*MB); - logReader.writeLogs(blockingOutput, from, to, hostname); - blockingOutput.close(); + try (output) { + logReader.writeLogs(output, from, to, hostname); } catch (Throwable t) { log.log(Level.WARNING, "Failed reading logs from " + from + " to " + to, t); @@ -62,74 +63,5 @@ public class LogHandler extends ThreadedHttpRequestHandler { } - private static class MaxPendingContentChannelOutputStream extends ContentChannelOutputStream { - private final long maxPending; - private final AtomicLong sent = new AtomicLong(0); - private final AtomicLong acked = new AtomicLong(0); - - public MaxPendingContentChannelOutputStream(ContentChannel endpoint, long maxPending) { - super(endpoint); - this.maxPending = maxPending; - } - - private long pendingBytes() { - return sent.get() - acked.get(); - } - - private class TrackCompletition implements CompletionHandler { - private final long written; - private final AtomicBoolean replied = new AtomicBoolean(false); - TrackCompletition(long written) { - this.written = written; - sent.addAndGet(written); - } - @Override - public void completed() { - if (!replied.getAndSet(true)) { - acked.addAndGet(written); - } - } - - @Override - public void failed(Throwable t) { - if (!replied.getAndSet(true)) { - acked.addAndGet(written); - } - } - } - @Override - public void send(ByteBuffer src) throws IOException { - try { - stallWhilePendingAbove(maxPending); - } catch (InterruptedException ignored) { - throw new IOException("Interrupted waiting for IO"); - } - CompletionHandler pendingTracker = new TrackCompletition(src.remaining()); - try { - send(src, pendingTracker); - } catch (Throwable throwable) { - pendingTracker.failed(throwable); - throw throwable; - } - } - - private void stallWhilePendingAbove(long pending) throws InterruptedException { - while (pendingBytes() > pending) { - Thread.sleep(1); - } - } - - @Override - public void flush() throws IOException { - super.flush(); - try { - stallWhilePendingAbove(0); - } - catch (InterruptedException e) { - throw new IOException("Interrupted waiting for IO"); - } - } - - } } |