diff options
author | Bjørn Christian Seime <bjorncs@verizonmedia.com> | 2021-10-07 12:09:50 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-10-07 12:09:50 +0200 |
commit | 4b837462afa5ae9adbebc10c226637ae04e05007 (patch) | |
tree | 70ff872463f0048c9d50abfed04fe03fba10d296 /container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletRequestReader.java | |
parent | 3b8c5f1e967c6354cae762a95cc1489ef91f1e55 (diff) | |
parent | 47b73921208a52d8f356be4ff87ed0f679791827 (diff) |
Merge pull request #19452 from vespa-engine/bjorncs/jetty-async-listener
Refactor async completion logic for read and write path [run-systemtest]
Diffstat (limited to 'container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletRequestReader.java')
-rw-r--r-- | container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletRequestReader.java | 220 |
1 files changed, 94 insertions, 126 deletions
diff --git a/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletRequestReader.java b/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletRequestReader.java index 849cf63fb8b..1def9ccaab1 100644 --- a/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletRequestReader.java +++ b/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletRequestReader.java @@ -1,7 +1,6 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.jdisc.http.server.jetty; -import com.google.common.base.Preconditions; import com.yahoo.jdisc.handler.CompletionHandler; import com.yahoo.jdisc.handler.ContentChannel; @@ -9,6 +8,7 @@ import javax.servlet.ReadListener; import javax.servlet.ServletInputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.logging.Level; import java.util.logging.Logger; @@ -25,8 +25,12 @@ import java.util.logging.Logger; * error reporting might be async. * Since we have tests that first reports errors and then closes the response content, * it's important that errors are delivered synchronously. + * + * @author Tony Vaagenes + * @author Oyvind Bakksjo + * @author bjorncs */ -class ServletRequestReader implements ReadListener { +class ServletRequestReader { private enum State { READING, ALL_DATA_READ, REQUEST_CONTENT_CLOSED @@ -38,12 +42,12 @@ class ServletRequestReader implements ReadListener { private final Object monitor = new Object(); - private final ServletInputStream servletInputStream; + private final ServletInputStream in; private final ContentChannel requestContentChannel; - private final Janitor janitor; private final RequestMetricReporter metricReporter; + private Throwable errorDuringRead; private int bytesRead; /** @@ -87,82 +91,91 @@ class ServletRequestReader implements ReadListener { * If calls to those methods does not close the request content channel immediately, * there is some outstanding completion callback that will later come in and complete the request. */ - final CompletableFuture<Void> finishedFuture = new CompletableFuture<>(); + private final CompletableFuture<Void> finishedFuture = new CompletableFuture<>(); - public ServletRequestReader( - ServletInputStream servletInputStream, + ServletRequestReader( + ServletInputStream in, ContentChannel requestContentChannel, Janitor janitor, RequestMetricReporter metricReporter) { - - Preconditions.checkNotNull(servletInputStream); - Preconditions.checkNotNull(requestContentChannel); - Preconditions.checkNotNull(janitor); - Preconditions.checkNotNull(metricReporter); - - this.servletInputStream = servletInputStream; - this.requestContentChannel = requestContentChannel; - this.janitor = janitor; - this.metricReporter = metricReporter; + this.in = Objects.requireNonNull(in); + this.requestContentChannel = Objects.requireNonNull(requestContentChannel); + this.janitor = Objects.requireNonNull(janitor); + this.metricReporter = Objects.requireNonNull(metricReporter); + in.setReadListener(new Listener()); } - @Override - public void onDataAvailable() throws IOException { - while (servletInputStream.isReady()) { - final byte[] buffer = new byte[BUFFER_SIZE_BYTES]; - int numBytesRead; + CompletableFuture<Void> finishedFuture() { return finishedFuture; } - synchronized (monitor) { - numBytesRead = servletInputStream.read(buffer); - if (numBytesRead < 0) { - // End of stream; there should be no more data available, ever. - return; - } - if (state != State.READING) { - //We have a failure, so no point in giving the buffer to the user. - assert finishedFuture.isCompletedExceptionally(); - return; + private class Listener implements ReadListener { + + @Override + public void onDataAvailable() throws IOException { + while (in.isReady()) { + final byte[] buffer = new byte[BUFFER_SIZE_BYTES]; + int numBytesRead; + + synchronized (monitor) { + numBytesRead = in.read(buffer); + if (numBytesRead < 0) { + // End of stream; there should be no more data available, ever. + return; + } + if (state != State.READING) { + //We have a failure, so no point in giving the buffer to the user. + assert finishedFuture.isCompletedExceptionally(); + return; + } + //wait for both + // - requestContentChannel.write to finish + // - the write completion handler to be called + numberOfOutstandingUserCalls += 2; + bytesRead += numBytesRead; } - //wait for both - // - requestContentChannel.write to finish - // - the write completion handler to be called - numberOfOutstandingUserCalls += 2; - bytesRead += numBytesRead; - } - try { - requestContentChannel.write(ByteBuffer.wrap(buffer, 0, numBytesRead), writeCompletionHandler); - metricReporter.successfulRead(numBytesRead); - } - catch (Throwable t) { - finishedFuture.completeExceptionally(t); - } - finally { - //decrease due to this method completing. - decreaseOutstandingUserCallsAndCloseRequestContentChannelConditionally(); + try { + requestContentChannel.write(ByteBuffer.wrap(buffer, 0, numBytesRead), new CompletionHandler() { + @Override + public void completed() { + decreaseOutstandingUserCallsAndCloseRequestContentChannelConditionally(); + } + @Override + public void failed(final Throwable t) { + decreaseOutstandingUserCallsAndCloseRequestContentChannelConditionally(); + finishedFuture.completeExceptionally(t); + } + }); + metricReporter.successfulRead(numBytesRead); + } catch (Throwable t) { + finishedFuture.completeExceptionally(t); + } finally { + //decrease due to this method completing. + decreaseOutstandingUserCallsAndCloseRequestContentChannelConditionally(); + } } } + + @Override public void onError(final Throwable t) { fail(t); } + @Override public void onAllDataRead() { doneReading(null); } + } + + void fail(Throwable t) { + doneReading(t); + finishedFuture.completeExceptionally(t); } private void decreaseOutstandingUserCallsAndCloseRequestContentChannelConditionally() { boolean shouldCloseRequestContentChannel; - synchronized (monitor) { assertStateNotEquals(state, State.REQUEST_CONTENT_CLOSED); - - numberOfOutstandingUserCalls -= 1; - - shouldCloseRequestContentChannel = numberOfOutstandingUserCalls == 0 && - (finishedFuture.isDone() || state == State.ALL_DATA_READ); - + shouldCloseRequestContentChannel = numberOfOutstandingUserCalls == 0 && state == State.ALL_DATA_READ; if (shouldCloseRequestContentChannel) { state = State.REQUEST_CONTENT_CLOSED; } } - if (shouldCloseRequestContentChannel) { - janitor.scheduleTask(this::closeCompletionHandler_noThrow); + janitor.scheduleTask(this::closeRequestContentChannel); } } @@ -178,22 +191,14 @@ class ServletRequestReader implements ReadListener { } } - @Override - public void onAllDataRead() { - doneReading(); - } - - private void doneReading() { - final boolean shouldCloseRequestContentChannel; - + private void doneReading(Throwable t) { + boolean shouldCloseRequestContentChannel; int bytesRead; - synchronized (monitor) { - if (state != State.READING) { - return; - } + synchronized (monitor) { + errorDuringRead = t; + if (state != State.READING) return; state = State.ALL_DATA_READ; - shouldCloseRequestContentChannel = numberOfOutstandingUserCalls == 0; if (shouldCloseRequestContentChannel) { state = State.REQUEST_CONTENT_CLOSED; @@ -202,69 +207,32 @@ class ServletRequestReader implements ReadListener { } if (shouldCloseRequestContentChannel) { - closeCompletionHandler_noThrow(); + closeRequestContentChannel(); } - metricReporter.contentSize(bytesRead); } - private void closeCompletionHandler_noThrow() { - //Cannot complete finishedFuture directly in completed(), as any exceptions after this fact will be ignored. - // E.g. - // close(CompletionHandler completionHandler) { - // completionHandler.completed(); - // throw new RuntimeException - // } - - CompletableFuture<Void> completedCalledFuture = new CompletableFuture<>(); - - CompletionHandler closeCompletionHandler = new CompletionHandler() { - @Override - public void completed() { - completedCalledFuture.complete(null); - } - - @Override - public void failed(final Throwable t) { - finishedFuture.completeExceptionally(t); - } - }; - + private void closeRequestContentChannel() { + Throwable readError; + synchronized (monitor) { readError = this.errorDuringRead; } try { - requestContentChannel.close(closeCompletionHandler); - //if close did not cause an exception, - // is it safe to pipe the result of the completionHandlerInvokedFuture into finishedFuture - completedCalledFuture.whenComplete(this::setFinishedFuture); - } catch (final Throwable t) { + if (readError != null) requestContentChannel.onError(readError); + //Cannot complete finishedFuture directly in completed(), as any exceptions after this fact will be ignored. + // E.g. + // close(CompletionHandler completionHandler) { + // completionHandler.completed(); + // throw new RuntimeException + // } + CompletableFuture<Void> completedCalledFuture = new CompletableFuture<>(); + requestContentChannel.close(new CompletionHandler() { + @Override public void completed() { completedCalledFuture.complete(null); } + @Override public void failed(Throwable t) { finishedFuture.completeExceptionally(t); } + }); + // Propagate successful completion as close did not throw an exception + completedCalledFuture.whenComplete((__, ___) -> finishedFuture.complete(null)); + } catch (Throwable t) { finishedFuture.completeExceptionally(t); } } - private void setFinishedFuture(Void result, Throwable throwable) { - if (throwable != null) { - finishedFuture.completeExceptionally(throwable); - } else { - finishedFuture.complete(null); - } - } - - @Override - public void onError(final Throwable t) { - finishedFuture.completeExceptionally(t); - requestContentChannel.onError(t); - doneReading(); - } - - private final CompletionHandler writeCompletionHandler = new CompletionHandler() { - @Override - public void completed() { - decreaseOutstandingUserCallsAndCloseRequestContentChannelConditionally(); - } - - @Override - public void failed(final Throwable t) { - finishedFuture.completeExceptionally(t); - decreaseOutstandingUserCallsAndCloseRequestContentChannelConditionally(); - } - }; } |