diff options
5 files changed, 108 insertions, 84 deletions
diff --git a/container-core/abi-spec.json b/container-core/abi-spec.json index 02d43104a3f..8c0f3e5fd80 100644 --- a/container-core/abi-spec.json +++ b/container-core/abi-spec.json @@ -762,6 +762,19 @@ ], "fields": [] }, + "com.yahoo.container.jdisc.MaxPendingContentChannelOutputStream": { + "superClass": "com.yahoo.container.jdisc.ContentChannelOutputStream", + "interfaces": [], + "attributes": [ + "public" + ], + "methods": [ + "public void <init>(com.yahoo.jdisc.handler.ContentChannel, long)", + "public void send(java.nio.ByteBuffer)", + "public void flush()" + ], + "fields": [] + }, "com.yahoo.container.jdisc.MetricConsumerFactory": { "superClass": "java.lang.Object", "interfaces": [], diff --git a/container-core/src/main/java/com/yahoo/container/jdisc/MaxPendingContentChannelOutputStream.java b/container-core/src/main/java/com/yahoo/container/jdisc/MaxPendingContentChannelOutputStream.java new file mode 100644 index 00000000000..aec4eeecd7b --- /dev/null +++ b/container-core/src/main/java/com/yahoo/container/jdisc/MaxPendingContentChannelOutputStream.java @@ -0,0 +1,92 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.container.jdisc; + +import com.yahoo.jdisc.handler.CompletionHandler; +import com.yahoo.jdisc.handler.ContentChannel; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +/** + * @author baldersheim + */ +public class MaxPendingContentChannelOutputStream extends ContentChannelOutputStream { + + private final long maxPending; + private final AtomicLong sent = new AtomicLong(0); + private final AtomicLong acked = new AtomicLong(0); + + public MaxPendingContentChannelOutputStream(ContentChannel endpoint, long maxPending) { + super(endpoint); + this.maxPending = maxPending; + } + + private long pendingBytes() { + return sent.get() - acked.get(); + } + + private class TrackCompletion implements CompletionHandler { + + private final long written; + private final AtomicBoolean replied = new AtomicBoolean(false); + + TrackCompletion(long written) { + this.written = written; + sent.addAndGet(written); + } + + @Override + public void completed() { + if (!replied.getAndSet(true)) { + acked.addAndGet(written); + } + } + + @Override + public void failed(Throwable t) { + if (!replied.getAndSet(true)) { + acked.addAndGet(written); + } + } + + } + + @Override + public void send(ByteBuffer src) throws IOException { + try { + stallWhilePendingAbove(maxPending); + } + catch (InterruptedException ignored) { + throw new InterruptedIOException("Interrupted waiting for IO"); + } + CompletionHandler pendingTracker = new TrackCompletion(src.remaining()); + try { + send(src, pendingTracker); + } + catch (Throwable throwable) { + pendingTracker.failed(throwable); + throw throwable; + } + } + + private void stallWhilePendingAbove(long pending) throws InterruptedException { + while (pendingBytes() > pending) { + Thread.sleep(1); + } + } + + @Override + public void flush() throws IOException { + super.flush(); + try { + stallWhilePendingAbove(0); + } + catch (InterruptedException e) { + throw new InterruptedIOException("Interrupted waiting for IO"); + } + } + +} diff --git a/container-core/src/main/java/com/yahoo/container/jdisc/ThreadedHttpRequestHandler.java b/container-core/src/main/java/com/yahoo/container/jdisc/ThreadedHttpRequestHandler.java index 0bfe4afe07d..0c3c1e2120b 100644 --- a/container-core/src/main/java/com/yahoo/container/jdisc/ThreadedHttpRequestHandler.java +++ b/container-core/src/main/java/com/yahoo/container/jdisc/ThreadedHttpRequestHandler.java @@ -10,9 +10,6 @@ import com.yahoo.jdisc.handler.ContentChannel; import com.yahoo.jdisc.handler.UnsafeContentInputStream; import com.yahoo.jdisc.handler.ResponseHandler; -import java.io.InterruptedIOException; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.io.IOException; @@ -253,81 +250,4 @@ public abstract class ThreadedHttpRequestHandler extends ThreadedRequestHandler } - /** - * @author baldersheim - */ - static class MaxPendingContentChannelOutputStream extends ContentChannelOutputStream { - private final long maxPending; - private final AtomicLong sent = new AtomicLong(0); - private final AtomicLong acked = new AtomicLong(0); - - public MaxPendingContentChannelOutputStream(ContentChannel endpoint, long maxPending) { - super(endpoint); - this.maxPending = maxPending; - } - - private long pendingBytes() { - return sent.get() - acked.get(); - } - - private class TrackCompletion implements CompletionHandler { - - private final long written; - private final AtomicBoolean replied = new AtomicBoolean(false); - - TrackCompletion(long written) { - this.written = written; - sent.addAndGet(written); - } - - @Override - public void completed() { - if ( ! replied.getAndSet(true)) { - acked.addAndGet(written); - } - } - - @Override - public void failed(Throwable t) { - if ( ! replied.getAndSet(true)) { - acked.addAndGet(written); - } - } - } - - @Override - public void send(ByteBuffer src) throws IOException { - try { - stallWhilePendingAbove(maxPending); - } catch (InterruptedException ignored) { - throw new InterruptedIOException("Interrupted waiting for IO"); - } - CompletionHandler pendingTracker = new TrackCompletion(src.remaining()); - try { - send(src, pendingTracker); - } catch (Throwable throwable) { - pendingTracker.failed(throwable); - throw throwable; - } - } - - private void stallWhilePendingAbove(long pending) throws InterruptedException { - while (pendingBytes() > pending) { - Thread.sleep(1); - } - } - - @Override - public void flush() throws IOException { - super.flush(); - try { - stallWhilePendingAbove(0); - } - catch (InterruptedException e) { - throw new InterruptedIOException("Interrupted waiting for IO"); - } - } - - } - } diff --git a/container-core/src/test/java/com/yahoo/container/jdisc/ThreadedRequestHandlerTestCase.java b/container-core/src/test/java/com/yahoo/container/jdisc/ThreadedRequestHandlerTestCase.java index d1036ce0e45..a9b16799aea 100644 --- a/container-core/src/test/java/com/yahoo/container/jdisc/ThreadedRequestHandlerTestCase.java +++ b/container-core/src/test/java/com/yahoo/container/jdisc/ThreadedRequestHandlerTestCase.java @@ -1,7 +1,6 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.container.jdisc; -import com.yahoo.container.jdisc.ThreadedHttpRequestHandler.MaxPendingContentChannelOutputStream; import com.yahoo.jdisc.Request; import com.yahoo.jdisc.Response; import com.yahoo.jdisc.application.ContainerBuilder; @@ -20,11 +19,9 @@ import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.Phaser; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; import static org.junit.Assert.*; diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java index f1d6b4825c6..865aed221c5 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java @@ -9,6 +9,7 @@ import com.yahoo.concurrent.DaemonThreadFactory; import com.yahoo.container.core.HandlerMetricContextUtil; import com.yahoo.container.core.documentapi.VespaDocumentAccess; import com.yahoo.container.jdisc.ContentChannelOutputStream; +import com.yahoo.container.jdisc.MaxPendingContentChannelOutputStream; import com.yahoo.document.Document; import com.yahoo.document.DocumentId; import com.yahoo.document.DocumentOperation; @@ -582,7 +583,8 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private JsonResponse(ResponseHandler handler, boolean streaming) throws IOException { this.handler = handler; - out = new ContentChannelOutputStream(buffer); + out = streaming ? new MaxPendingContentChannelOutputStream(buffer, 1 << 24) + : new ContentChannelOutputStream(buffer); json = jsonFactory.createGenerator(out); json.writeStartObject(); } |