aboutsummaryrefslogtreecommitdiffstats
path: root/jdisc_http_service
diff options
context:
space:
mode:
authorBjørn Christian Seime <bjorncs@yahoo-inc.com>2016-11-30 14:26:36 +0100
committerBjørn Christian Seime <bjorncs@yahoo-inc.com>2016-11-30 14:43:54 +0100
commite958aee5adca061d2650f9f5378ce93a9e92ae7d (patch)
tree92f35a9ef3a990b2ed98606d917f0da9fcc61838 /jdisc_http_service
parent53ba0846d7775904843e81693706bd2f82fcbf21 (diff)
Register Servlet WriteListener during request dispatch
Diffstat (limited to 'jdisc_http_service')
-rw-r--r--jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/HttpRequestDispatch.java1
-rw-r--r--jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletOutputStreamWriter.java63
-rw-r--r--jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletResponseController.java3
3 files changed, 50 insertions, 17 deletions
diff --git a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/HttpRequestDispatch.java b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/HttpRequestDispatch.java
index b42ed90569e..2351a4742cf 100644
--- a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/HttpRequestDispatch.java
+++ b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/HttpRequestDispatch.java
@@ -70,6 +70,7 @@ class HttpRequestDispatch {
this.async = servletRequest.startAsync();
async.setTimeout(0);
+ servletResponseController.registerWriteListener();
}
public void dispatch() throws IOException {
diff --git a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletOutputStreamWriter.java b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletOutputStreamWriter.java
index 0cb44e445d2..5f248db9fe3 100644
--- a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletOutputStreamWriter.java
+++ b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletOutputStreamWriter.java
@@ -34,7 +34,8 @@ public class ServletOutputStreamWriter {
private enum State {
NOT_STARTED,
WAITING_FOR_WRITE_POSSIBLE_CALLBACK,
- WAITING_FOR_BUFFER,
+ WAITING_FOR_FIRST_BUFFER,
+ WAITING_FOR_SUBSEQUENT_BUFFER,
WRITING_BUFFERS,
FINISHED_OR_ERROR
}
@@ -76,12 +77,25 @@ public class ServletOutputStreamWriter {
this.metricReporter = metricReporter;
}
+ public void registerWriteListener() {
+ synchronized (monitor) {
+ assertStateIs(state, State.NOT_STARTED);
+ outputStream.setWriteListener(writeListener);
+ }
+ }
+
public void sendErrorContentAndCloseAsync(ByteBuffer errorContent) {
synchronized (monitor) {
// Assert that no content has been written as it is too late to write error response if the response is committed.
- assertStateIs(state, State.NOT_STARTED);
- writeBuffer(errorContent, null);
- close(null);
+ switch (state) {
+ case NOT_STARTED:
+ case WAITING_FOR_FIRST_BUFFER:
+ writeBuffer(errorContent, null);
+ close(null);
+ return;
+ default:
+ throw createAndLogAssertionError("Invalid state: " + state);
+ }
}
}
@@ -100,12 +114,12 @@ public class ServletOutputStreamWriter {
switch (state) {
case NOT_STARTED:
state = State.WAITING_FOR_WRITE_POSSIBLE_CALLBACK;
- outputStream.setWriteListener(writeListener);
break;
case WAITING_FOR_WRITE_POSSIBLE_CALLBACK:
case WRITING_BUFFERS:
break;
- case WAITING_FOR_BUFFER:
+ case WAITING_FOR_FIRST_BUFFER:
+ case WAITING_FOR_SUBSEQUENT_BUFFER:
thisThreadShouldWrite = true;
state = State.WRITING_BUFFERS;
break;
@@ -145,7 +159,7 @@ public class ServletOutputStreamWriter {
contentPart = responseContentQueue.pollFirst();
if (contentPart == null && lastOperationWasFlush) {
- state = State.WAITING_FOR_BUFFER;
+ state = State.WAITING_FOR_SUBSEQUENT_BUFFER;
return;
}
}
@@ -239,12 +253,16 @@ public class ServletOutputStreamWriter {
private static void assertStateIs(State currentState, State expectedState) {
if (currentState != expectedState) {
- AssertionError error = new AssertionError("Expected state " + expectedState + ", got state " + currentState);
- log.log(Level.WARNING, "Assertion failed.", error);
- throw error;
+ throw createAndLogAssertionError("Expected state " + expectedState + ", got state " + currentState);
}
}
+ private static AssertionError createAndLogAssertionError(String detailedMessage) {
+ AssertionError error = new AssertionError(detailedMessage);
+ log.log(Level.WARNING, "Assertion failed.", error);
+ return error;
+ }
+
public void fail(Throwable t) {
setFinished(Optional.of(t));
}
@@ -252,16 +270,27 @@ public class ServletOutputStreamWriter {
private final WriteListener writeListener = new WriteListener() {
@Override
public void onWritePossible() throws IOException {
+ boolean shouldWriteBuffers = false;
synchronized (monitor) {
- if (state == State.FINISHED_OR_ERROR) {
- return;
+ switch (state) {
+ case NOT_STARTED:
+ state = State.WAITING_FOR_FIRST_BUFFER;
+ break;
+ case WAITING_FOR_WRITE_POSSIBLE_CALLBACK:
+ state = State.WRITING_BUFFERS;
+ shouldWriteBuffers = true;
+ break;
+ case FINISHED_OR_ERROR:
+ return;
+ case WAITING_FOR_FIRST_BUFFER:
+ case WAITING_FOR_SUBSEQUENT_BUFFER:
+ case WRITING_BUFFERS:
+ throw createAndLogAssertionError("Invalid state: " + state);
}
-
- assertStateIs(state, State.WAITING_FOR_WRITE_POSSIBLE_CALLBACK);
- state = State.WRITING_BUFFERS;
}
-
- writeBuffersInQueueToOutputStream();
+ if (shouldWriteBuffers) {
+ writeBuffersInQueueToOutputStream();
+ }
}
@Override
diff --git a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletResponseController.java b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletResponseController.java
index 1c20db62d82..eaafb1b1d78 100644
--- a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletResponseController.java
+++ b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletResponseController.java
@@ -67,6 +67,9 @@ public class ServletResponseController {
new ServletOutputStreamWriter(servletResponse.getOutputStream(), executor, metricReporter);
}
+ public void registerWriteListener() {
+ servletOutputStreamWriter.registerWriteListener();
+ }
private static int getStatusCode(Throwable t) {
if (t instanceof BindingNotFoundException) {