diff options
author | Øyvind Grønnesby <oyving@verizonmedia.com> | 2021-04-28 12:21:57 +0200 |
---|---|---|
committer | Øyvind Grønnesby <oyving@verizonmedia.com> | 2021-04-28 12:21:57 +0200 |
commit | dd2ef6cfc4d3d6e3735d1cb553f7ae2560a7f1ff (patch) | |
tree | e1ba3a56439bb3c16022b60d2a7ab3534037827e /container-core/src/main/java/com/yahoo/container/jdisc/ThreadedHttpRequestHandler.java | |
parent | a0db2b1020ea53aa356a7547a23d4e1dfaa851c0 (diff) | |
parent | e79af49a3159e5505cd3e5f2605c299d38fe40cd (diff) |
Merge remote-tracking branch 'origin/master' into ogronnesby/billing-api-v2
Diffstat (limited to 'container-core/src/main/java/com/yahoo/container/jdisc/ThreadedHttpRequestHandler.java')
-rw-r--r-- | container-core/src/main/java/com/yahoo/container/jdisc/ThreadedHttpRequestHandler.java | 87 |
1 files changed, 85 insertions, 2 deletions
diff --git a/container-core/src/main/java/com/yahoo/container/jdisc/ThreadedHttpRequestHandler.java b/container-core/src/main/java/com/yahoo/container/jdisc/ThreadedHttpRequestHandler.java index 9687697d6f6..be708f2fc94 100644 --- a/container-core/src/main/java/com/yahoo/container/jdisc/ThreadedHttpRequestHandler.java +++ b/container-core/src/main/java/com/yahoo/container/jdisc/ThreadedHttpRequestHandler.java @@ -9,6 +9,10 @@ import com.yahoo.jdisc.handler.CompletionHandler; import com.yahoo.jdisc.handler.ContentChannel; import com.yahoo.jdisc.handler.UnsafeContentInputStream; import com.yahoo.jdisc.handler.ResponseHandler; + +import java.io.InterruptedIOException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.io.IOException; @@ -28,7 +32,7 @@ import java.util.logging.Logger; * @author Steinar Knutsen * @author bratseth */ -public abstract class ThreadedHttpRequestHandler extends ThreadedRequestHandler { +public abstract class ThreadedHttpRequestHandler extends ThreadedRequestHandler implements HttpRequestHandler { public static final String CONTENT_TYPE = "Content-Type"; private static final String RENDERING_ERRORS = "rendering_errors"; @@ -97,7 +101,8 @@ public abstract class ThreadedHttpRequestHandler extends ThreadedRequestHandler LoggingCompletionHandler logOnCompletion = null; ContentChannelOutputStream output = null; try { - output = new ContentChannelOutputStream(channel); + output = httpResponse.maxPendingBytes() > 0 ? new MaxPendingContentChannelOutputStream(channel, httpResponse.maxPendingBytes()) + : new ContentChannelOutputStream(channel); logOnCompletion = createLoggingCompletionHandler(startTime, System.currentTimeMillis(), httpResponse, request, output); @@ -247,4 +252,82 @@ public abstract class ThreadedHttpRequestHandler extends ThreadedRequestHandler return (com.yahoo.jdisc.http.HttpRequest) request; } + + /** + * @author baldersheim + */ + static class MaxPendingContentChannelOutputStream extends ContentChannelOutputStream { + private final long maxPending; + private final AtomicLong sent = new AtomicLong(0); + private final AtomicLong acked = new AtomicLong(0); + + public MaxPendingContentChannelOutputStream(ContentChannel endpoint, long maxPending) { + super(endpoint); + this.maxPending = maxPending; + } + + private long pendingBytes() { + return sent.get() - acked.get(); + } + + private class TrackCompletion implements CompletionHandler { + + private final long written; + private final AtomicBoolean replied = new AtomicBoolean(false); + + TrackCompletion(long written) { + this.written = written; + sent.addAndGet(written); + } + + @Override + public void completed() { + if ( ! replied.getAndSet(true)) { + acked.addAndGet(written); + } + } + + @Override + public void failed(Throwable t) { + if ( ! replied.getAndSet(true)) { + acked.addAndGet(written); + } + } + } + + @Override + public void send(ByteBuffer src) throws IOException { + try { + stallWhilePendingAbove(maxPending); + } catch (InterruptedException ignored) { + throw new InterruptedIOException("Interrupted waiting for IO"); + } + CompletionHandler pendingTracker = new TrackCompletion(src.remaining()); + try { + send(src, pendingTracker); + } catch (Throwable throwable) { + pendingTracker.failed(throwable); + throw throwable; + } + } + + private void stallWhilePendingAbove(long pending) throws InterruptedException { + while (pendingBytes() > pending) { + Thread.sleep(1); + } + } + + @Override + public void flush() throws IOException { + super.flush(); + try { + stallWhilePendingAbove(0); + } + catch (InterruptedException e) { + throw new InterruptedIOException("Interrupted waiting for IO"); + } + } + + } + } |