diff options
author | Jon Marius Venstad <jonmv@users.noreply.github.com> | 2021-04-14 17:34:21 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-04-14 17:34:21 +0200 |
commit | ea6ad3bfa7c7adfcc299809eaf431d2521a91656 (patch) | |
tree | ee78a01012b222b19a381b345d01af42c69d5d86 | |
parent | 8a6aeb13c413927b5012e5f927f54a7650a7db5a (diff) | |
parent | f62d7b095c353508103f0f038e1f957435d54a38 (diff) |
Merge pull request #17430 from vespa-engine/jonmv/max-pending-http-response
Move MaxPendingContentChannelStream to container-core and use it thor…
6 files changed, 146 insertions, 78 deletions
diff --git a/container-core/abi-spec.json b/container-core/abi-spec.json index e333621df0e..c133c9cc158 100644 --- a/container-core/abi-spec.json +++ b/container-core/abi-spec.json @@ -606,6 +606,7 @@ "methods": [ "public void <init>(int)", "public abstract void render(java.io.OutputStream)", + "public long maxPendingBytes()", "public int getStatus()", "public void setStatus(int)", "public com.yahoo.jdisc.HeaderFields headers()", diff --git a/container-core/src/main/java/com/yahoo/container/handler/LogHandler.java b/container-core/src/main/java/com/yahoo/container/handler/LogHandler.java index 991cd83ffa8..f1ba68ff3c8 100644 --- a/container-core/src/main/java/com/yahoo/container/handler/LogHandler.java +++ b/container-core/src/main/java/com/yahoo/container/handler/LogHandler.java @@ -11,6 +11,7 @@ import com.yahoo.jdisc.handler.CompletionHandler; import com.yahoo.jdisc.handler.ContentChannel; import java.io.IOException; +import java.io.InterruptedIOException; import java.io.OutputStream; import java.nio.ByteBuffer; import java.time.Instant; @@ -23,7 +24,7 @@ import java.util.logging.Level; public class LogHandler extends ThreadedHttpRequestHandler { private final LogReader logReader; - private static final long MB = 1024*1024; + private static final long MB = 1024 * 1024; @Inject public LogHandler(Executor executor, LogHandlerConfig config) { @@ -45,11 +46,11 @@ public class LogHandler extends ThreadedHttpRequestHandler { return new AsyncHttpResponse(200) { @Override + public long maxPendingBytes() { return MB; } + @Override public void render(OutputStream output, ContentChannel networkChannel, CompletionHandler handler) { - try { - OutputStream blockingOutput = new MaxPendingContentChannelOutputStream(networkChannel, 1*MB); - logReader.writeLogs(blockingOutput, from, to, hostname); - blockingOutput.close(); + try (output) { + logReader.writeLogs(output, from, to, hostname); } catch (Throwable t) { log.log(Level.WARNING, "Failed reading logs from " + from + " to " + to, t); @@ -62,74 +63,5 @@ public class LogHandler extends ThreadedHttpRequestHandler { } - private static class MaxPendingContentChannelOutputStream extends ContentChannelOutputStream { - private final long maxPending; - private final AtomicLong sent = new AtomicLong(0); - private final AtomicLong acked = new AtomicLong(0); - - public MaxPendingContentChannelOutputStream(ContentChannel endpoint, long maxPending) { - super(endpoint); - this.maxPending = maxPending; - } - - private long pendingBytes() { - return sent.get() - acked.get(); - } - - private class TrackCompletition implements CompletionHandler { - private final long written; - private final AtomicBoolean replied = new AtomicBoolean(false); - TrackCompletition(long written) { - this.written = written; - sent.addAndGet(written); - } - @Override - public void completed() { - if (!replied.getAndSet(true)) { - acked.addAndGet(written); - } - } - - @Override - public void failed(Throwable t) { - if (!replied.getAndSet(true)) { - acked.addAndGet(written); - } - } - } - @Override - public void send(ByteBuffer src) throws IOException { - try { - stallWhilePendingAbove(maxPending); - } catch (InterruptedException ignored) { - throw new IOException("Interrupted waiting for IO"); - } - CompletionHandler pendingTracker = new TrackCompletition(src.remaining()); - try { - send(src, pendingTracker); - } catch (Throwable throwable) { - pendingTracker.failed(throwable); - throw throwable; - } - } - - private void stallWhilePendingAbove(long pending) throws InterruptedException { - while (pendingBytes() > pending) { - Thread.sleep(1); - } - } - - @Override - public void flush() throws IOException { - super.flush(); - try { - stallWhilePendingAbove(0); - } - catch (InterruptedException e) { - throw new IOException("Interrupted waiting for IO"); - } - } - - } } diff --git a/container-core/src/main/java/com/yahoo/container/jdisc/HttpResponse.java b/container-core/src/main/java/com/yahoo/container/jdisc/HttpResponse.java index a6042c541c0..5df40a90fe6 100644 --- a/container-core/src/main/java/com/yahoo/container/jdisc/HttpResponse.java +++ b/container-core/src/main/java/com/yahoo/container/jdisc/HttpResponse.java @@ -40,6 +40,9 @@ public abstract class HttpResponse { /** Marshals this response to the network layer. The caller is responsible for flushing and closing outputStream. */ public abstract void render(OutputStream outputStream) throws IOException; + /** The amount of content bytes this response may have in-flight (if positive) before response rendering blocks. */ + public long maxPendingBytes() { return -1; } + /** * Returns the numeric HTTP status code, e.g. 200, 404 and so on. * diff --git a/container-core/src/main/java/com/yahoo/container/jdisc/ThreadedHttpRequestHandler.java b/container-core/src/main/java/com/yahoo/container/jdisc/ThreadedHttpRequestHandler.java index 9687697d6f6..5b8fe907293 100644 --- a/container-core/src/main/java/com/yahoo/container/jdisc/ThreadedHttpRequestHandler.java +++ b/container-core/src/main/java/com/yahoo/container/jdisc/ThreadedHttpRequestHandler.java @@ -9,6 +9,10 @@ import com.yahoo.jdisc.handler.CompletionHandler; import com.yahoo.jdisc.handler.ContentChannel; import com.yahoo.jdisc.handler.UnsafeContentInputStream; import com.yahoo.jdisc.handler.ResponseHandler; + +import java.io.InterruptedIOException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.io.IOException; @@ -97,7 +101,8 @@ public abstract class ThreadedHttpRequestHandler extends ThreadedRequestHandler LoggingCompletionHandler logOnCompletion = null; ContentChannelOutputStream output = null; try { - output = new ContentChannelOutputStream(channel); + output = httpResponse.maxPendingBytes() > 0 ? new MaxPendingContentChannelOutputStream(channel, httpResponse.maxPendingBytes()) + : new ContentChannelOutputStream(channel); logOnCompletion = createLoggingCompletionHandler(startTime, System.currentTimeMillis(), httpResponse, request, output); @@ -247,4 +252,82 @@ public abstract class ThreadedHttpRequestHandler extends ThreadedRequestHandler return (com.yahoo.jdisc.http.HttpRequest) request; } + + /** + * @author baldersheim + */ + static class MaxPendingContentChannelOutputStream extends ContentChannelOutputStream { + private final long maxPending; + private final AtomicLong sent = new AtomicLong(0); + private final AtomicLong acked = new AtomicLong(0); + + public MaxPendingContentChannelOutputStream(ContentChannel endpoint, long maxPending) { + super(endpoint); + this.maxPending = maxPending; + } + + private long pendingBytes() { + return sent.get() - acked.get(); + } + + private class TrackCompletion implements CompletionHandler { + + private final long written; + private final AtomicBoolean replied = new AtomicBoolean(false); + + TrackCompletion(long written) { + this.written = written; + sent.addAndGet(written); + } + + @Override + public void completed() { + if ( ! replied.getAndSet(true)) { + acked.addAndGet(written); + } + } + + @Override + public void failed(Throwable t) { + if ( ! replied.getAndSet(true)) { + acked.addAndGet(written); + } + } + } + + @Override + public void send(ByteBuffer src) throws IOException { + try { + stallWhilePendingAbove(maxPending); + } catch (InterruptedException ignored) { + throw new InterruptedIOException("Interrupted waiting for IO"); + } + CompletionHandler pendingTracker = new TrackCompletion(src.remaining()); + try { + send(src, pendingTracker); + } catch (Throwable throwable) { + pendingTracker.failed(throwable); + throw throwable; + } + } + + private void stallWhilePendingAbove(long pending) throws InterruptedException { + while (pendingBytes() > pending) { + Thread.sleep(1); + } + } + + @Override + public void flush() throws IOException { + super.flush(); + try { + stallWhilePendingAbove(0); + } + catch (InterruptedException e) { + throw new InterruptedIOException("Interrupted waiting for IO"); + } + } + + } + } diff --git a/container-core/src/test/java/com/yahoo/container/handler/LogHandlerTest.java b/container-core/src/test/java/com/yahoo/container/handler/LogHandlerTest.java index afe57579a97..38683c75375 100644 --- a/container-core/src/test/java/com/yahoo/container/handler/LogHandlerTest.java +++ b/container-core/src/test/java/com/yahoo/container/handler/LogHandlerTest.java @@ -2,6 +2,7 @@ package com.yahoo.container.handler; import com.yahoo.container.jdisc.AsyncHttpResponse; +import com.yahoo.container.jdisc.ContentChannelOutputStream; import com.yahoo.container.jdisc.HttpRequest; import com.yahoo.jdisc.handler.ReadableContentChannel; import com.yahoo.yolean.Exceptions; @@ -28,7 +29,7 @@ public class LogHandlerTest { String uri = "http://myhost.com:1111/logs?from=1000&to=2000"; AsyncHttpResponse response = logHandler.handle(HttpRequest.createTestRequest(uri, com.yahoo.jdisc.http.HttpRequest.Method.GET)); ReadableContentChannel out = new ReadableContentChannel(); - new Thread(() -> Exceptions.uncheck(() -> response.render(null, out, null))).start(); + new Thread(() -> Exceptions.uncheck(() -> response.render(new ContentChannelOutputStream(out), out, null))).start(); String expectedResponse = "newer log"; assertEquals(expectedResponse, new String(out.toStream().readAllBytes(), UTF_8)); } @@ -37,7 +38,7 @@ public class LogHandlerTest { String uri = "http://myhost.com:1111/logs?from=0&to=1000"; AsyncHttpResponse response = logHandler.handle(HttpRequest.createTestRequest(uri, com.yahoo.jdisc.http.HttpRequest.Method.GET)); ReadableContentChannel out = new ReadableContentChannel(); - new Thread(() -> Exceptions.uncheck(() -> response.render(null, out, null))).start(); + new Thread(() -> Exceptions.uncheck(() -> response.render(new ContentChannelOutputStream(out), out, null))).start(); String expectedResponse = "older log"; assertEquals(expectedResponse, new String(out.toStream().readAllBytes(), UTF_8)); } diff --git a/container-core/src/test/java/com/yahoo/container/jdisc/ThreadedRequestHandlerTestCase.java b/container-core/src/test/java/com/yahoo/container/jdisc/ThreadedRequestHandlerTestCase.java index 331c536a531..cfea0f5c38b 100644 --- a/container-core/src/test/java/com/yahoo/container/jdisc/ThreadedRequestHandlerTestCase.java +++ b/container-core/src/test/java/com/yahoo/container/jdisc/ThreadedRequestHandlerTestCase.java @@ -1,22 +1,30 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.container.jdisc; +import com.yahoo.container.jdisc.ThreadedHttpRequestHandler.MaxPendingContentChannelOutputStream; import com.yahoo.jdisc.Request; import com.yahoo.jdisc.Response; import com.yahoo.jdisc.application.ContainerBuilder; import com.yahoo.jdisc.handler.*; import com.yahoo.jdisc.test.TestDriver; -import org.junit.Ignore; +import com.yahoo.yolean.Exceptions; import org.junit.Test; +import java.io.IOException; import java.net.URI; import java.nio.ByteBuffer; import java.time.Duration; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.Phaser; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import static org.junit.Assert.*; @@ -353,4 +361,44 @@ public class ThreadedRequestHandlerTestCase { latch.countDown(); } } + + @Test + public void testMaxPendingOutputStream() throws IOException, ExecutionException, InterruptedException { + ReadableContentChannel buffer = new ReadableContentChannel(); + MaxPendingContentChannelOutputStream limited = new MaxPendingContentChannelOutputStream(buffer, 2); + + ExecutorService executor = Executors.newSingleThreadExecutor(); + + limited.send(ByteBuffer.allocate(2)); + limited.send(ByteBuffer.allocate(1)); // 2 is not > 2, so OK. + + // Next write will block. + Future<?> future = executor.submit(() -> Exceptions.uncheck(() -> limited.send(ByteBuffer.allocate(0)))); + try { + future.get(100, TimeUnit.MILLISECONDS); + fail("Should not be able to write now"); + } + catch (TimeoutException expected) { } + + // Free buffer capacity, so write completes, then drain buffer. + assertEquals(2, buffer.read().capacity()); + future.get(); + buffer.close(null); + assertEquals(1, buffer.read().capacity()); + assertEquals(0, buffer.read().capacity()); + assertNull(buffer.read()); + + // Buffer is closed, so further writes fail. This does not count towards pending bytes. + try { + limited.send(ByteBuffer.allocate(3)); + fail("Should throw"); + } + catch (IOException expected) { } + try { + limited.send(ByteBuffer.allocate(3)); + fail("Should throw"); + } + catch (IOException expected) { } + } + } |