summaryrefslogtreecommitdiffstats
path: root/container-core
diff options
context:
space:
mode:
authorJon Marius Venstad <jonmv@users.noreply.github.com>2021-04-14 17:34:21 +0200
committerGitHub <noreply@github.com>2021-04-14 17:34:21 +0200
commitea6ad3bfa7c7adfcc299809eaf431d2521a91656 (patch)
treeee78a01012b222b19a381b345d01af42c69d5d86 /container-core
parent8a6aeb13c413927b5012e5f927f54a7650a7db5a (diff)
parentf62d7b095c353508103f0f038e1f957435d54a38 (diff)
Merge pull request #17430 from vespa-engine/jonmv/max-pending-http-response
Move MaxPendingContentChannelStream to container-core and use it thor…
Diffstat (limited to 'container-core')
-rw-r--r--container-core/abi-spec.json1
-rw-r--r--container-core/src/main/java/com/yahoo/container/handler/LogHandler.java80
-rw-r--r--container-core/src/main/java/com/yahoo/container/jdisc/HttpResponse.java3
-rw-r--r--container-core/src/main/java/com/yahoo/container/jdisc/ThreadedHttpRequestHandler.java85
-rw-r--r--container-core/src/test/java/com/yahoo/container/handler/LogHandlerTest.java5
-rw-r--r--container-core/src/test/java/com/yahoo/container/jdisc/ThreadedRequestHandlerTestCase.java50
6 files changed, 146 insertions, 78 deletions
diff --git a/container-core/abi-spec.json b/container-core/abi-spec.json
index e333621df0e..c133c9cc158 100644
--- a/container-core/abi-spec.json
+++ b/container-core/abi-spec.json
@@ -606,6 +606,7 @@
"methods": [
"public void <init>(int)",
"public abstract void render(java.io.OutputStream)",
+ "public long maxPendingBytes()",
"public int getStatus()",
"public void setStatus(int)",
"public com.yahoo.jdisc.HeaderFields headers()",
diff --git a/container-core/src/main/java/com/yahoo/container/handler/LogHandler.java b/container-core/src/main/java/com/yahoo/container/handler/LogHandler.java
index 991cd83ffa8..f1ba68ff3c8 100644
--- a/container-core/src/main/java/com/yahoo/container/handler/LogHandler.java
+++ b/container-core/src/main/java/com/yahoo/container/handler/LogHandler.java
@@ -11,6 +11,7 @@ import com.yahoo.jdisc.handler.CompletionHandler;
import com.yahoo.jdisc.handler.ContentChannel;
import java.io.IOException;
+import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.time.Instant;
@@ -23,7 +24,7 @@ import java.util.logging.Level;
public class LogHandler extends ThreadedHttpRequestHandler {
private final LogReader logReader;
- private static final long MB = 1024*1024;
+ private static final long MB = 1024 * 1024;
@Inject
public LogHandler(Executor executor, LogHandlerConfig config) {
@@ -45,11 +46,11 @@ public class LogHandler extends ThreadedHttpRequestHandler {
return new AsyncHttpResponse(200) {
@Override
+ public long maxPendingBytes() { return MB; }
+ @Override
public void render(OutputStream output, ContentChannel networkChannel, CompletionHandler handler) {
- try {
- OutputStream blockingOutput = new MaxPendingContentChannelOutputStream(networkChannel, 1*MB);
- logReader.writeLogs(blockingOutput, from, to, hostname);
- blockingOutput.close();
+ try (output) {
+ logReader.writeLogs(output, from, to, hostname);
}
catch (Throwable t) {
log.log(Level.WARNING, "Failed reading logs from " + from + " to " + to, t);
@@ -62,74 +63,5 @@ public class LogHandler extends ThreadedHttpRequestHandler {
}
- private static class MaxPendingContentChannelOutputStream extends ContentChannelOutputStream {
- private final long maxPending;
- private final AtomicLong sent = new AtomicLong(0);
- private final AtomicLong acked = new AtomicLong(0);
-
- public MaxPendingContentChannelOutputStream(ContentChannel endpoint, long maxPending) {
- super(endpoint);
- this.maxPending = maxPending;
- }
-
- private long pendingBytes() {
- return sent.get() - acked.get();
- }
-
- private class TrackCompletition implements CompletionHandler {
- private final long written;
- private final AtomicBoolean replied = new AtomicBoolean(false);
- TrackCompletition(long written) {
- this.written = written;
- sent.addAndGet(written);
- }
- @Override
- public void completed() {
- if (!replied.getAndSet(true)) {
- acked.addAndGet(written);
- }
- }
-
- @Override
- public void failed(Throwable t) {
- if (!replied.getAndSet(true)) {
- acked.addAndGet(written);
- }
- }
- }
- @Override
- public void send(ByteBuffer src) throws IOException {
- try {
- stallWhilePendingAbove(maxPending);
- } catch (InterruptedException ignored) {
- throw new IOException("Interrupted waiting for IO");
- }
- CompletionHandler pendingTracker = new TrackCompletition(src.remaining());
- try {
- send(src, pendingTracker);
- } catch (Throwable throwable) {
- pendingTracker.failed(throwable);
- throw throwable;
- }
- }
-
- private void stallWhilePendingAbove(long pending) throws InterruptedException {
- while (pendingBytes() > pending) {
- Thread.sleep(1);
- }
- }
-
- @Override
- public void flush() throws IOException {
- super.flush();
- try {
- stallWhilePendingAbove(0);
- }
- catch (InterruptedException e) {
- throw new IOException("Interrupted waiting for IO");
- }
- }
-
- }
}
diff --git a/container-core/src/main/java/com/yahoo/container/jdisc/HttpResponse.java b/container-core/src/main/java/com/yahoo/container/jdisc/HttpResponse.java
index a6042c541c0..5df40a90fe6 100644
--- a/container-core/src/main/java/com/yahoo/container/jdisc/HttpResponse.java
+++ b/container-core/src/main/java/com/yahoo/container/jdisc/HttpResponse.java
@@ -40,6 +40,9 @@ public abstract class HttpResponse {
/** Marshals this response to the network layer. The caller is responsible for flushing and closing outputStream. */
public abstract void render(OutputStream outputStream) throws IOException;
+ /** The amount of content bytes this response may have in-flight (if positive) before response rendering blocks. */
+ public long maxPendingBytes() { return -1; }
+
/**
* Returns the numeric HTTP status code, e.g. 200, 404 and so on.
*
diff --git a/container-core/src/main/java/com/yahoo/container/jdisc/ThreadedHttpRequestHandler.java b/container-core/src/main/java/com/yahoo/container/jdisc/ThreadedHttpRequestHandler.java
index 9687697d6f6..5b8fe907293 100644
--- a/container-core/src/main/java/com/yahoo/container/jdisc/ThreadedHttpRequestHandler.java
+++ b/container-core/src/main/java/com/yahoo/container/jdisc/ThreadedHttpRequestHandler.java
@@ -9,6 +9,10 @@ import com.yahoo.jdisc.handler.CompletionHandler;
import com.yahoo.jdisc.handler.ContentChannel;
import com.yahoo.jdisc.handler.UnsafeContentInputStream;
import com.yahoo.jdisc.handler.ResponseHandler;
+
+import java.io.InterruptedIOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.io.IOException;
@@ -97,7 +101,8 @@ public abstract class ThreadedHttpRequestHandler extends ThreadedRequestHandler
LoggingCompletionHandler logOnCompletion = null;
ContentChannelOutputStream output = null;
try {
- output = new ContentChannelOutputStream(channel);
+ output = httpResponse.maxPendingBytes() > 0 ? new MaxPendingContentChannelOutputStream(channel, httpResponse.maxPendingBytes())
+ : new ContentChannelOutputStream(channel);
logOnCompletion = createLoggingCompletionHandler(startTime, System.currentTimeMillis(),
httpResponse, request, output);
@@ -247,4 +252,82 @@ public abstract class ThreadedHttpRequestHandler extends ThreadedRequestHandler
return (com.yahoo.jdisc.http.HttpRequest) request;
}
+
+ /**
+ * @author baldersheim
+ */
+ static class MaxPendingContentChannelOutputStream extends ContentChannelOutputStream {
+ private final long maxPending;
+ private final AtomicLong sent = new AtomicLong(0);
+ private final AtomicLong acked = new AtomicLong(0);
+
+ public MaxPendingContentChannelOutputStream(ContentChannel endpoint, long maxPending) {
+ super(endpoint);
+ this.maxPending = maxPending;
+ }
+
+ private long pendingBytes() {
+ return sent.get() - acked.get();
+ }
+
+ private class TrackCompletion implements CompletionHandler {
+
+ private final long written;
+ private final AtomicBoolean replied = new AtomicBoolean(false);
+
+ TrackCompletion(long written) {
+ this.written = written;
+ sent.addAndGet(written);
+ }
+
+ @Override
+ public void completed() {
+ if ( ! replied.getAndSet(true)) {
+ acked.addAndGet(written);
+ }
+ }
+
+ @Override
+ public void failed(Throwable t) {
+ if ( ! replied.getAndSet(true)) {
+ acked.addAndGet(written);
+ }
+ }
+ }
+
+ @Override
+ public void send(ByteBuffer src) throws IOException {
+ try {
+ stallWhilePendingAbove(maxPending);
+ } catch (InterruptedException ignored) {
+ throw new InterruptedIOException("Interrupted waiting for IO");
+ }
+ CompletionHandler pendingTracker = new TrackCompletion(src.remaining());
+ try {
+ send(src, pendingTracker);
+ } catch (Throwable throwable) {
+ pendingTracker.failed(throwable);
+ throw throwable;
+ }
+ }
+
+ private void stallWhilePendingAbove(long pending) throws InterruptedException {
+ while (pendingBytes() > pending) {
+ Thread.sleep(1);
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+ super.flush();
+ try {
+ stallWhilePendingAbove(0);
+ }
+ catch (InterruptedException e) {
+ throw new InterruptedIOException("Interrupted waiting for IO");
+ }
+ }
+
+ }
+
}
diff --git a/container-core/src/test/java/com/yahoo/container/handler/LogHandlerTest.java b/container-core/src/test/java/com/yahoo/container/handler/LogHandlerTest.java
index afe57579a97..38683c75375 100644
--- a/container-core/src/test/java/com/yahoo/container/handler/LogHandlerTest.java
+++ b/container-core/src/test/java/com/yahoo/container/handler/LogHandlerTest.java
@@ -2,6 +2,7 @@
package com.yahoo.container.handler;
import com.yahoo.container.jdisc.AsyncHttpResponse;
+import com.yahoo.container.jdisc.ContentChannelOutputStream;
import com.yahoo.container.jdisc.HttpRequest;
import com.yahoo.jdisc.handler.ReadableContentChannel;
import com.yahoo.yolean.Exceptions;
@@ -28,7 +29,7 @@ public class LogHandlerTest {
String uri = "http://myhost.com:1111/logs?from=1000&to=2000";
AsyncHttpResponse response = logHandler.handle(HttpRequest.createTestRequest(uri, com.yahoo.jdisc.http.HttpRequest.Method.GET));
ReadableContentChannel out = new ReadableContentChannel();
- new Thread(() -> Exceptions.uncheck(() -> response.render(null, out, null))).start();
+ new Thread(() -> Exceptions.uncheck(() -> response.render(new ContentChannelOutputStream(out), out, null))).start();
String expectedResponse = "newer log";
assertEquals(expectedResponse, new String(out.toStream().readAllBytes(), UTF_8));
}
@@ -37,7 +38,7 @@ public class LogHandlerTest {
String uri = "http://myhost.com:1111/logs?from=0&to=1000";
AsyncHttpResponse response = logHandler.handle(HttpRequest.createTestRequest(uri, com.yahoo.jdisc.http.HttpRequest.Method.GET));
ReadableContentChannel out = new ReadableContentChannel();
- new Thread(() -> Exceptions.uncheck(() -> response.render(null, out, null))).start();
+ new Thread(() -> Exceptions.uncheck(() -> response.render(new ContentChannelOutputStream(out), out, null))).start();
String expectedResponse = "older log";
assertEquals(expectedResponse, new String(out.toStream().readAllBytes(), UTF_8));
}
diff --git a/container-core/src/test/java/com/yahoo/container/jdisc/ThreadedRequestHandlerTestCase.java b/container-core/src/test/java/com/yahoo/container/jdisc/ThreadedRequestHandlerTestCase.java
index 331c536a531..cfea0f5c38b 100644
--- a/container-core/src/test/java/com/yahoo/container/jdisc/ThreadedRequestHandlerTestCase.java
+++ b/container-core/src/test/java/com/yahoo/container/jdisc/ThreadedRequestHandlerTestCase.java
@@ -1,22 +1,30 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.container.jdisc;
+import com.yahoo.container.jdisc.ThreadedHttpRequestHandler.MaxPendingContentChannelOutputStream;
import com.yahoo.jdisc.Request;
import com.yahoo.jdisc.Response;
import com.yahoo.jdisc.application.ContainerBuilder;
import com.yahoo.jdisc.handler.*;
import com.yahoo.jdisc.test.TestDriver;
-import org.junit.Ignore;
+import com.yahoo.yolean.Exceptions;
import org.junit.Test;
+import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Phaser;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.*;
@@ -353,4 +361,44 @@ public class ThreadedRequestHandlerTestCase {
latch.countDown();
}
}
+
+ @Test
+ public void testMaxPendingOutputStream() throws IOException, ExecutionException, InterruptedException {
+ ReadableContentChannel buffer = new ReadableContentChannel();
+ MaxPendingContentChannelOutputStream limited = new MaxPendingContentChannelOutputStream(buffer, 2);
+
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+
+ limited.send(ByteBuffer.allocate(2));
+ limited.send(ByteBuffer.allocate(1)); // 2 is not > 2, so OK.
+
+ // Next write will block.
+ Future<?> future = executor.submit(() -> Exceptions.uncheck(() -> limited.send(ByteBuffer.allocate(0))));
+ try {
+ future.get(100, TimeUnit.MILLISECONDS);
+ fail("Should not be able to write now");
+ }
+ catch (TimeoutException expected) { }
+
+ // Free buffer capacity, so write completes, then drain buffer.
+ assertEquals(2, buffer.read().capacity());
+ future.get();
+ buffer.close(null);
+ assertEquals(1, buffer.read().capacity());
+ assertEquals(0, buffer.read().capacity());
+ assertNull(buffer.read());
+
+ // Buffer is closed, so further writes fail. This does not count towards pending bytes.
+ try {
+ limited.send(ByteBuffer.allocate(3));
+ fail("Should throw");
+ }
+ catch (IOException expected) { }
+ try {
+ limited.send(ByteBuffer.allocate(3));
+ fail("Should throw");
+ }
+ catch (IOException expected) { }
+ }
+
}