diff options
author | Bjørn Christian Seime <bjorncs@yahoo-inc.com> | 2016-11-30 14:26:36 +0100 |
---|---|---|
committer | Bjørn Christian Seime <bjorncs@yahoo-inc.com> | 2016-11-30 14:43:54 +0100 |
commit | e958aee5adca061d2650f9f5378ce93a9e92ae7d (patch) | |
tree | 92f35a9ef3a990b2ed98606d917f0da9fcc61838 /jdisc_http_service | |
parent | 53ba0846d7775904843e81693706bd2f82fcbf21 (diff) |
Register Servlet WriteListener during request dispatch
Diffstat (limited to 'jdisc_http_service')
3 files changed, 50 insertions, 17 deletions
diff --git a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/HttpRequestDispatch.java b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/HttpRequestDispatch.java index b42ed90569e..2351a4742cf 100644 --- a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/HttpRequestDispatch.java +++ b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/HttpRequestDispatch.java @@ -70,6 +70,7 @@ class HttpRequestDispatch { this.async = servletRequest.startAsync(); async.setTimeout(0); + servletResponseController.registerWriteListener(); } public void dispatch() throws IOException { diff --git a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletOutputStreamWriter.java b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletOutputStreamWriter.java index 0cb44e445d2..5f248db9fe3 100644 --- a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletOutputStreamWriter.java +++ b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletOutputStreamWriter.java @@ -34,7 +34,8 @@ public class ServletOutputStreamWriter { private enum State { NOT_STARTED, WAITING_FOR_WRITE_POSSIBLE_CALLBACK, - WAITING_FOR_BUFFER, + WAITING_FOR_FIRST_BUFFER, + WAITING_FOR_SUBSEQUENT_BUFFER, WRITING_BUFFERS, FINISHED_OR_ERROR } @@ -76,12 +77,25 @@ public class ServletOutputStreamWriter { this.metricReporter = metricReporter; } + public void registerWriteListener() { + synchronized (monitor) { + assertStateIs(state, State.NOT_STARTED); + outputStream.setWriteListener(writeListener); + } + } + public void sendErrorContentAndCloseAsync(ByteBuffer errorContent) { synchronized (monitor) { // Assert that no content has been written as it is too late to write error response if the response is committed. - assertStateIs(state, State.NOT_STARTED); - writeBuffer(errorContent, null); - close(null); + switch (state) { + case NOT_STARTED: + case WAITING_FOR_FIRST_BUFFER: + writeBuffer(errorContent, null); + close(null); + return; + default: + throw createAndLogAssertionError("Invalid state: " + state); + } } } @@ -100,12 +114,12 @@ public class ServletOutputStreamWriter { switch (state) { case NOT_STARTED: state = State.WAITING_FOR_WRITE_POSSIBLE_CALLBACK; - outputStream.setWriteListener(writeListener); break; case WAITING_FOR_WRITE_POSSIBLE_CALLBACK: case WRITING_BUFFERS: break; - case WAITING_FOR_BUFFER: + case WAITING_FOR_FIRST_BUFFER: + case WAITING_FOR_SUBSEQUENT_BUFFER: thisThreadShouldWrite = true; state = State.WRITING_BUFFERS; break; @@ -145,7 +159,7 @@ public class ServletOutputStreamWriter { contentPart = responseContentQueue.pollFirst(); if (contentPart == null && lastOperationWasFlush) { - state = State.WAITING_FOR_BUFFER; + state = State.WAITING_FOR_SUBSEQUENT_BUFFER; return; } } @@ -239,12 +253,16 @@ public class ServletOutputStreamWriter { private static void assertStateIs(State currentState, State expectedState) { if (currentState != expectedState) { - AssertionError error = new AssertionError("Expected state " + expectedState + ", got state " + currentState); - log.log(Level.WARNING, "Assertion failed.", error); - throw error; + throw createAndLogAssertionError("Expected state " + expectedState + ", got state " + currentState); } } + private static AssertionError createAndLogAssertionError(String detailedMessage) { + AssertionError error = new AssertionError(detailedMessage); + log.log(Level.WARNING, "Assertion failed.", error); + return error; + } + public void fail(Throwable t) { setFinished(Optional.of(t)); } @@ -252,16 +270,27 @@ public class ServletOutputStreamWriter { private final WriteListener writeListener = new WriteListener() { @Override public void onWritePossible() throws IOException { + boolean shouldWriteBuffers = false; synchronized (monitor) { - if (state == State.FINISHED_OR_ERROR) { - return; + switch (state) { + case NOT_STARTED: + state = State.WAITING_FOR_FIRST_BUFFER; + break; + case WAITING_FOR_WRITE_POSSIBLE_CALLBACK: + state = State.WRITING_BUFFERS; + shouldWriteBuffers = true; + break; + case FINISHED_OR_ERROR: + return; + case WAITING_FOR_FIRST_BUFFER: + case WAITING_FOR_SUBSEQUENT_BUFFER: + case WRITING_BUFFERS: + throw createAndLogAssertionError("Invalid state: " + state); } - - assertStateIs(state, State.WAITING_FOR_WRITE_POSSIBLE_CALLBACK); - state = State.WRITING_BUFFERS; } - - writeBuffersInQueueToOutputStream(); + if (shouldWriteBuffers) { + writeBuffersInQueueToOutputStream(); + } } @Override diff --git a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletResponseController.java b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletResponseController.java index 1c20db62d82..eaafb1b1d78 100644 --- a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletResponseController.java +++ b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletResponseController.java @@ -67,6 +67,9 @@ public class ServletResponseController { new ServletOutputStreamWriter(servletResponse.getOutputStream(), executor, metricReporter); } + public void registerWriteListener() { + servletOutputStreamWriter.registerWriteListener(); + } private static int getStatusCode(Throwable t) { if (t instanceof BindingNotFoundException) { |