summaryrefslogtreecommitdiffstats
path: root/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletRequestReader.java
diff options
context:
space:
mode:
authorBjørn Christian Seime <bjorncs@verizonmedia.com>2021-10-07 12:09:50 +0200
committerGitHub <noreply@github.com>2021-10-07 12:09:50 +0200
commit4b837462afa5ae9adbebc10c226637ae04e05007 (patch)
tree70ff872463f0048c9d50abfed04fe03fba10d296 /container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletRequestReader.java
parent3b8c5f1e967c6354cae762a95cc1489ef91f1e55 (diff)
parent47b73921208a52d8f356be4ff87ed0f679791827 (diff)
Merge pull request #19452 from vespa-engine/bjorncs/jetty-async-listener
Refactor async completion logic for read and write path [run-systemtest]
Diffstat (limited to 'container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletRequestReader.java')
-rw-r--r--container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletRequestReader.java220
1 files changed, 94 insertions, 126 deletions
diff --git a/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletRequestReader.java b/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletRequestReader.java
index 849cf63fb8b..1def9ccaab1 100644
--- a/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletRequestReader.java
+++ b/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletRequestReader.java
@@ -1,7 +1,6 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.jdisc.http.server.jetty;
-import com.google.common.base.Preconditions;
import com.yahoo.jdisc.handler.CompletionHandler;
import com.yahoo.jdisc.handler.ContentChannel;
@@ -9,6 +8,7 @@ import javax.servlet.ReadListener;
import javax.servlet.ServletInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -25,8 +25,12 @@ import java.util.logging.Logger;
* error reporting might be async.
* Since we have tests that first reports errors and then closes the response content,
* it's important that errors are delivered synchronously.
+ *
+ * @author Tony Vaagenes
+ * @author Oyvind Bakksjo
+ * @author bjorncs
*/
-class ServletRequestReader implements ReadListener {
+class ServletRequestReader {
private enum State {
READING, ALL_DATA_READ, REQUEST_CONTENT_CLOSED
@@ -38,12 +42,12 @@ class ServletRequestReader implements ReadListener {
private final Object monitor = new Object();
- private final ServletInputStream servletInputStream;
+ private final ServletInputStream in;
private final ContentChannel requestContentChannel;
-
private final Janitor janitor;
private final RequestMetricReporter metricReporter;
+ private Throwable errorDuringRead;
private int bytesRead;
/**
@@ -87,82 +91,91 @@ class ServletRequestReader implements ReadListener {
* If calls to those methods does not close the request content channel immediately,
* there is some outstanding completion callback that will later come in and complete the request.
*/
- final CompletableFuture<Void> finishedFuture = new CompletableFuture<>();
+ private final CompletableFuture<Void> finishedFuture = new CompletableFuture<>();
- public ServletRequestReader(
- ServletInputStream servletInputStream,
+ ServletRequestReader(
+ ServletInputStream in,
ContentChannel requestContentChannel,
Janitor janitor,
RequestMetricReporter metricReporter) {
-
- Preconditions.checkNotNull(servletInputStream);
- Preconditions.checkNotNull(requestContentChannel);
- Preconditions.checkNotNull(janitor);
- Preconditions.checkNotNull(metricReporter);
-
- this.servletInputStream = servletInputStream;
- this.requestContentChannel = requestContentChannel;
- this.janitor = janitor;
- this.metricReporter = metricReporter;
+ this.in = Objects.requireNonNull(in);
+ this.requestContentChannel = Objects.requireNonNull(requestContentChannel);
+ this.janitor = Objects.requireNonNull(janitor);
+ this.metricReporter = Objects.requireNonNull(metricReporter);
+ in.setReadListener(new Listener());
}
- @Override
- public void onDataAvailable() throws IOException {
- while (servletInputStream.isReady()) {
- final byte[] buffer = new byte[BUFFER_SIZE_BYTES];
- int numBytesRead;
+ CompletableFuture<Void> finishedFuture() { return finishedFuture; }
- synchronized (monitor) {
- numBytesRead = servletInputStream.read(buffer);
- if (numBytesRead < 0) {
- // End of stream; there should be no more data available, ever.
- return;
- }
- if (state != State.READING) {
- //We have a failure, so no point in giving the buffer to the user.
- assert finishedFuture.isCompletedExceptionally();
- return;
+ private class Listener implements ReadListener {
+
+ @Override
+ public void onDataAvailable() throws IOException {
+ while (in.isReady()) {
+ final byte[] buffer = new byte[BUFFER_SIZE_BYTES];
+ int numBytesRead;
+
+ synchronized (monitor) {
+ numBytesRead = in.read(buffer);
+ if (numBytesRead < 0) {
+ // End of stream; there should be no more data available, ever.
+ return;
+ }
+ if (state != State.READING) {
+ //We have a failure, so no point in giving the buffer to the user.
+ assert finishedFuture.isCompletedExceptionally();
+ return;
+ }
+ //wait for both
+ // - requestContentChannel.write to finish
+ // - the write completion handler to be called
+ numberOfOutstandingUserCalls += 2;
+ bytesRead += numBytesRead;
}
- //wait for both
- // - requestContentChannel.write to finish
- // - the write completion handler to be called
- numberOfOutstandingUserCalls += 2;
- bytesRead += numBytesRead;
- }
- try {
- requestContentChannel.write(ByteBuffer.wrap(buffer, 0, numBytesRead), writeCompletionHandler);
- metricReporter.successfulRead(numBytesRead);
- }
- catch (Throwable t) {
- finishedFuture.completeExceptionally(t);
- }
- finally {
- //decrease due to this method completing.
- decreaseOutstandingUserCallsAndCloseRequestContentChannelConditionally();
+ try {
+ requestContentChannel.write(ByteBuffer.wrap(buffer, 0, numBytesRead), new CompletionHandler() {
+ @Override
+ public void completed() {
+ decreaseOutstandingUserCallsAndCloseRequestContentChannelConditionally();
+ }
+ @Override
+ public void failed(final Throwable t) {
+ decreaseOutstandingUserCallsAndCloseRequestContentChannelConditionally();
+ finishedFuture.completeExceptionally(t);
+ }
+ });
+ metricReporter.successfulRead(numBytesRead);
+ } catch (Throwable t) {
+ finishedFuture.completeExceptionally(t);
+ } finally {
+ //decrease due to this method completing.
+ decreaseOutstandingUserCallsAndCloseRequestContentChannelConditionally();
+ }
}
}
+
+ @Override public void onError(final Throwable t) { fail(t); }
+ @Override public void onAllDataRead() { doneReading(null); }
+ }
+
+ void fail(Throwable t) {
+ doneReading(t);
+ finishedFuture.completeExceptionally(t);
}
private void decreaseOutstandingUserCallsAndCloseRequestContentChannelConditionally() {
boolean shouldCloseRequestContentChannel;
-
synchronized (monitor) {
assertStateNotEquals(state, State.REQUEST_CONTENT_CLOSED);
-
-
numberOfOutstandingUserCalls -= 1;
-
- shouldCloseRequestContentChannel = numberOfOutstandingUserCalls == 0 &&
- (finishedFuture.isDone() || state == State.ALL_DATA_READ);
-
+ shouldCloseRequestContentChannel = numberOfOutstandingUserCalls == 0 && state == State.ALL_DATA_READ;
if (shouldCloseRequestContentChannel) {
state = State.REQUEST_CONTENT_CLOSED;
}
}
-
if (shouldCloseRequestContentChannel) {
- janitor.scheduleTask(this::closeCompletionHandler_noThrow);
+ janitor.scheduleTask(this::closeRequestContentChannel);
}
}
@@ -178,22 +191,14 @@ class ServletRequestReader implements ReadListener {
}
}
- @Override
- public void onAllDataRead() {
- doneReading();
- }
-
- private void doneReading() {
- final boolean shouldCloseRequestContentChannel;
-
+ private void doneReading(Throwable t) {
+ boolean shouldCloseRequestContentChannel;
int bytesRead;
- synchronized (monitor) {
- if (state != State.READING) {
- return;
- }
+ synchronized (monitor) {
+ errorDuringRead = t;
+ if (state != State.READING) return;
state = State.ALL_DATA_READ;
-
shouldCloseRequestContentChannel = numberOfOutstandingUserCalls == 0;
if (shouldCloseRequestContentChannel) {
state = State.REQUEST_CONTENT_CLOSED;
@@ -202,69 +207,32 @@ class ServletRequestReader implements ReadListener {
}
if (shouldCloseRequestContentChannel) {
- closeCompletionHandler_noThrow();
+ closeRequestContentChannel();
}
-
metricReporter.contentSize(bytesRead);
}
- private void closeCompletionHandler_noThrow() {
- //Cannot complete finishedFuture directly in completed(), as any exceptions after this fact will be ignored.
- // E.g.
- // close(CompletionHandler completionHandler) {
- // completionHandler.completed();
- // throw new RuntimeException
- // }
-
- CompletableFuture<Void> completedCalledFuture = new CompletableFuture<>();
-
- CompletionHandler closeCompletionHandler = new CompletionHandler() {
- @Override
- public void completed() {
- completedCalledFuture.complete(null);
- }
-
- @Override
- public void failed(final Throwable t) {
- finishedFuture.completeExceptionally(t);
- }
- };
-
+ private void closeRequestContentChannel() {
+ Throwable readError;
+ synchronized (monitor) { readError = this.errorDuringRead; }
try {
- requestContentChannel.close(closeCompletionHandler);
- //if close did not cause an exception,
- // is it safe to pipe the result of the completionHandlerInvokedFuture into finishedFuture
- completedCalledFuture.whenComplete(this::setFinishedFuture);
- } catch (final Throwable t) {
+ if (readError != null) requestContentChannel.onError(readError);
+ //Cannot complete finishedFuture directly in completed(), as any exceptions after this fact will be ignored.
+ // E.g.
+ // close(CompletionHandler completionHandler) {
+ // completionHandler.completed();
+ // throw new RuntimeException
+ // }
+ CompletableFuture<Void> completedCalledFuture = new CompletableFuture<>();
+ requestContentChannel.close(new CompletionHandler() {
+ @Override public void completed() { completedCalledFuture.complete(null); }
+ @Override public void failed(Throwable t) { finishedFuture.completeExceptionally(t); }
+ });
+ // Propagate successful completion as close did not throw an exception
+ completedCalledFuture.whenComplete((__, ___) -> finishedFuture.complete(null));
+ } catch (Throwable t) {
finishedFuture.completeExceptionally(t);
}
}
- private void setFinishedFuture(Void result, Throwable throwable) {
- if (throwable != null) {
- finishedFuture.completeExceptionally(throwable);
- } else {
- finishedFuture.complete(null);
- }
- }
-
- @Override
- public void onError(final Throwable t) {
- finishedFuture.completeExceptionally(t);
- requestContentChannel.onError(t);
- doneReading();
- }
-
- private final CompletionHandler writeCompletionHandler = new CompletionHandler() {
- @Override
- public void completed() {
- decreaseOutstandingUserCallsAndCloseRequestContentChannelConditionally();
- }
-
- @Override
- public void failed(final Throwable t) {
- finishedFuture.completeExceptionally(t);
- decreaseOutstandingUserCallsAndCloseRequestContentChannelConditionally();
- }
- };
}