summaryrefslogtreecommitdiffstats
path: root/container-core/src/main/java/com/yahoo/container/jdisc
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-04-14 16:28:11 +0200
committerJon Marius Venstad <venstad@gmail.com>2021-04-14 16:28:11 +0200
commitf62d7b095c353508103f0f038e1f957435d54a38 (patch)
tree11ae6f185ae78179011bec39b6d5fdfe889c1dc6 /container-core/src/main/java/com/yahoo/container/jdisc
parentfd9b726786f4c00b276f2d84fd0a3593a0c406eb (diff)
Move MaxPendingContentChannelStream to container-core and use it thorugh max pending on HttpResponse
Diffstat (limited to 'container-core/src/main/java/com/yahoo/container/jdisc')
-rw-r--r--container-core/src/main/java/com/yahoo/container/jdisc/HttpResponse.java3
-rw-r--r--container-core/src/main/java/com/yahoo/container/jdisc/ThreadedHttpRequestHandler.java85
2 files changed, 87 insertions, 1 deletions
diff --git a/container-core/src/main/java/com/yahoo/container/jdisc/HttpResponse.java b/container-core/src/main/java/com/yahoo/container/jdisc/HttpResponse.java
index a6042c541c0..5df40a90fe6 100644
--- a/container-core/src/main/java/com/yahoo/container/jdisc/HttpResponse.java
+++ b/container-core/src/main/java/com/yahoo/container/jdisc/HttpResponse.java
@@ -40,6 +40,9 @@ public abstract class HttpResponse {
/** Marshals this response to the network layer. The caller is responsible for flushing and closing outputStream. */
public abstract void render(OutputStream outputStream) throws IOException;
+ /** The amount of content bytes this response may have in-flight (if positive) before response rendering blocks. */
+ public long maxPendingBytes() { return -1; }
+
/**
* Returns the numeric HTTP status code, e.g. 200, 404 and so on.
*
diff --git a/container-core/src/main/java/com/yahoo/container/jdisc/ThreadedHttpRequestHandler.java b/container-core/src/main/java/com/yahoo/container/jdisc/ThreadedHttpRequestHandler.java
index 9687697d6f6..5b8fe907293 100644
--- a/container-core/src/main/java/com/yahoo/container/jdisc/ThreadedHttpRequestHandler.java
+++ b/container-core/src/main/java/com/yahoo/container/jdisc/ThreadedHttpRequestHandler.java
@@ -9,6 +9,10 @@ import com.yahoo.jdisc.handler.CompletionHandler;
import com.yahoo.jdisc.handler.ContentChannel;
import com.yahoo.jdisc.handler.UnsafeContentInputStream;
import com.yahoo.jdisc.handler.ResponseHandler;
+
+import java.io.InterruptedIOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.io.IOException;
@@ -97,7 +101,8 @@ public abstract class ThreadedHttpRequestHandler extends ThreadedRequestHandler
LoggingCompletionHandler logOnCompletion = null;
ContentChannelOutputStream output = null;
try {
- output = new ContentChannelOutputStream(channel);
+ output = httpResponse.maxPendingBytes() > 0 ? new MaxPendingContentChannelOutputStream(channel, httpResponse.maxPendingBytes())
+ : new ContentChannelOutputStream(channel);
logOnCompletion = createLoggingCompletionHandler(startTime, System.currentTimeMillis(),
httpResponse, request, output);
@@ -247,4 +252,82 @@ public abstract class ThreadedHttpRequestHandler extends ThreadedRequestHandler
return (com.yahoo.jdisc.http.HttpRequest) request;
}
+
+ /**
+ * @author baldersheim
+ */
+ static class MaxPendingContentChannelOutputStream extends ContentChannelOutputStream {
+ private final long maxPending;
+ private final AtomicLong sent = new AtomicLong(0);
+ private final AtomicLong acked = new AtomicLong(0);
+
+ public MaxPendingContentChannelOutputStream(ContentChannel endpoint, long maxPending) {
+ super(endpoint);
+ this.maxPending = maxPending;
+ }
+
+ private long pendingBytes() {
+ return sent.get() - acked.get();
+ }
+
+ private class TrackCompletion implements CompletionHandler {
+
+ private final long written;
+ private final AtomicBoolean replied = new AtomicBoolean(false);
+
+ TrackCompletion(long written) {
+ this.written = written;
+ sent.addAndGet(written);
+ }
+
+ @Override
+ public void completed() {
+ if ( ! replied.getAndSet(true)) {
+ acked.addAndGet(written);
+ }
+ }
+
+ @Override
+ public void failed(Throwable t) {
+ if ( ! replied.getAndSet(true)) {
+ acked.addAndGet(written);
+ }
+ }
+ }
+
+ @Override
+ public void send(ByteBuffer src) throws IOException {
+ try {
+ stallWhilePendingAbove(maxPending);
+ } catch (InterruptedException ignored) {
+ throw new InterruptedIOException("Interrupted waiting for IO");
+ }
+ CompletionHandler pendingTracker = new TrackCompletion(src.remaining());
+ try {
+ send(src, pendingTracker);
+ } catch (Throwable throwable) {
+ pendingTracker.failed(throwable);
+ throw throwable;
+ }
+ }
+
+ private void stallWhilePendingAbove(long pending) throws InterruptedException {
+ while (pendingBytes() > pending) {
+ Thread.sleep(1);
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+ super.flush();
+ try {
+ stallWhilePendingAbove(0);
+ }
+ catch (InterruptedException e) {
+ throw new InterruptedIOException("Interrupted waiting for IO");
+ }
+ }
+
+ }
+
}