aboutsummaryrefslogtreecommitdiffstats
path: root/container-core/src/main/java/com/yahoo/container/handler/LogHandler.java
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-04-14 16:28:11 +0200
committerJon Marius Venstad <venstad@gmail.com>2021-04-14 16:28:11 +0200
commitf62d7b095c353508103f0f038e1f957435d54a38 (patch)
tree11ae6f185ae78179011bec39b6d5fdfe889c1dc6 /container-core/src/main/java/com/yahoo/container/handler/LogHandler.java
parentfd9b726786f4c00b276f2d84fd0a3593a0c406eb (diff)
Move MaxPendingContentChannelStream to container-core and use it thorugh max pending on HttpResponse
Diffstat (limited to 'container-core/src/main/java/com/yahoo/container/handler/LogHandler.java')
-rw-r--r--container-core/src/main/java/com/yahoo/container/handler/LogHandler.java80
1 files changed, 6 insertions, 74 deletions
diff --git a/container-core/src/main/java/com/yahoo/container/handler/LogHandler.java b/container-core/src/main/java/com/yahoo/container/handler/LogHandler.java
index 991cd83ffa8..f1ba68ff3c8 100644
--- a/container-core/src/main/java/com/yahoo/container/handler/LogHandler.java
+++ b/container-core/src/main/java/com/yahoo/container/handler/LogHandler.java
@@ -11,6 +11,7 @@ import com.yahoo.jdisc.handler.CompletionHandler;
import com.yahoo.jdisc.handler.ContentChannel;
import java.io.IOException;
+import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.time.Instant;
@@ -23,7 +24,7 @@ import java.util.logging.Level;
public class LogHandler extends ThreadedHttpRequestHandler {
private final LogReader logReader;
- private static final long MB = 1024*1024;
+ private static final long MB = 1024 * 1024;
@Inject
public LogHandler(Executor executor, LogHandlerConfig config) {
@@ -45,11 +46,11 @@ public class LogHandler extends ThreadedHttpRequestHandler {
return new AsyncHttpResponse(200) {
@Override
+ public long maxPendingBytes() { return MB; }
+ @Override
public void render(OutputStream output, ContentChannel networkChannel, CompletionHandler handler) {
- try {
- OutputStream blockingOutput = new MaxPendingContentChannelOutputStream(networkChannel, 1*MB);
- logReader.writeLogs(blockingOutput, from, to, hostname);
- blockingOutput.close();
+ try (output) {
+ logReader.writeLogs(output, from, to, hostname);
}
catch (Throwable t) {
log.log(Level.WARNING, "Failed reading logs from " + from + " to " + to, t);
@@ -62,74 +63,5 @@ public class LogHandler extends ThreadedHttpRequestHandler {
}
- private static class MaxPendingContentChannelOutputStream extends ContentChannelOutputStream {
- private final long maxPending;
- private final AtomicLong sent = new AtomicLong(0);
- private final AtomicLong acked = new AtomicLong(0);
-
- public MaxPendingContentChannelOutputStream(ContentChannel endpoint, long maxPending) {
- super(endpoint);
- this.maxPending = maxPending;
- }
-
- private long pendingBytes() {
- return sent.get() - acked.get();
- }
-
- private class TrackCompletition implements CompletionHandler {
- private final long written;
- private final AtomicBoolean replied = new AtomicBoolean(false);
- TrackCompletition(long written) {
- this.written = written;
- sent.addAndGet(written);
- }
- @Override
- public void completed() {
- if (!replied.getAndSet(true)) {
- acked.addAndGet(written);
- }
- }
-
- @Override
- public void failed(Throwable t) {
- if (!replied.getAndSet(true)) {
- acked.addAndGet(written);
- }
- }
- }
- @Override
- public void send(ByteBuffer src) throws IOException {
- try {
- stallWhilePendingAbove(maxPending);
- } catch (InterruptedException ignored) {
- throw new IOException("Interrupted waiting for IO");
- }
- CompletionHandler pendingTracker = new TrackCompletition(src.remaining());
- try {
- send(src, pendingTracker);
- } catch (Throwable throwable) {
- pendingTracker.failed(throwable);
- throw throwable;
- }
- }
-
- private void stallWhilePendingAbove(long pending) throws InterruptedException {
- while (pendingBytes() > pending) {
- Thread.sleep(1);
- }
- }
-
- @Override
- public void flush() throws IOException {
- super.flush();
- try {
- stallWhilePendingAbove(0);
- }
- catch (InterruptedException e) {
- throw new IOException("Interrupted waiting for IO");
- }
- }
-
- }
}