aboutsummaryrefslogtreecommitdiffstats
path: root/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletOutputStreamWriter.java
diff options
context:
space:
mode:
Diffstat (limited to 'container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletOutputStreamWriter.java')
-rw-r--r--container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletOutputStreamWriter.java299
1 files changed, 299 insertions, 0 deletions
diff --git a/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletOutputStreamWriter.java b/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletOutputStreamWriter.java
new file mode 100644
index 00000000000..b4d03385c3b
--- /dev/null
+++ b/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletOutputStreamWriter.java
@@ -0,0 +1,299 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jdisc.http.server.jetty;
+
+import com.yahoo.jdisc.handler.CompletionHandler;
+
+import javax.servlet.ServletOutputStream;
+import javax.servlet.WriteListener;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import static com.yahoo.jdisc.http.server.jetty.CompletionHandlerUtils.NOOP_COMPLETION_HANDLER;
+
+/**
+ * @author Tony Vaagenes
+ * @author bjorncs
+ */
+public class ServletOutputStreamWriter {
+ /** Rules:
+ * 1) Don't modify the output stream without isReady returning true (write/flush/close).
+ * Multiple modification calls without interleaving isReady calls are not allowed.
+ * 2) If isReady returned false, no other calls should be made until the write listener is invoked.
+ * 3) If the write listener sees isReady == false, it must not do any modifications before its next invocation.
+ */
+
+
+ private enum State {
+ NOT_STARTED,
+ WAITING_FOR_WRITE_POSSIBLE_CALLBACK,
+ WAITING_FOR_BUFFER,
+ WRITING_BUFFERS,
+ FINISHED_OR_ERROR
+ }
+
+ private static final Logger log = Logger.getLogger(ServletOutputStreamWriter.class.getName());
+
+ // If so, application code could fake a close by writing such a byte buffer.
+ // The problem can be solved by filtering out zero-length byte buffers from application code.
+ // Other ways to express this are also possible, e.g. with a 'closed' state checked when queue goes empty.
+ private static final ByteBuffer CLOSE_STREAM_BUFFER = ByteBuffer.allocate(0);
+
+ private final Object monitor = new Object();
+
+ // GuardedBy("monitor")
+ private State state = State.NOT_STARTED;
+
+ // GuardedBy("state")
+ private final ServletOutputStream outputStream;
+ private final Executor executor;
+
+ // GuardedBy("monitor")
+ private final Deque<ResponseContentPart> responseContentQueue = new ArrayDeque<>();
+
+ private final RequestMetricReporter metricReporter;
+
+ /**
+ * When this future completes there will be no more calls against the servlet output stream or servlet response.
+ * The framework is still allowed to invoke us though.
+ *
+ * The future might complete in the servlet framework thread, user thread or executor thread.
+ */
+ final CompletableFuture<Void> finishedFuture = new CompletableFuture<>();
+
+
+ public ServletOutputStreamWriter(ServletOutputStream outputStream, Executor executor, RequestMetricReporter metricReporter) {
+ this.outputStream = outputStream;
+ this.executor = executor;
+ this.metricReporter = metricReporter;
+ }
+
+ public void sendErrorContentAndCloseAsync(ByteBuffer errorContent) {
+ synchronized (monitor) {
+ // Assert that no content has been written as it is too late to write error response if the response is committed.
+ assertStateIs(state, State.NOT_STARTED);
+ queueErrorContent_holdingLock(errorContent);
+ state = State.WAITING_FOR_WRITE_POSSIBLE_CALLBACK;
+ outputStream.setWriteListener(writeListener);
+ }
+ }
+
+ private void queueErrorContent_holdingLock(ByteBuffer errorContent) {
+ responseContentQueue.addLast(new ResponseContentPart(errorContent, NOOP_COMPLETION_HANDLER));
+ responseContentQueue.addLast(new ResponseContentPart(CLOSE_STREAM_BUFFER, NOOP_COMPLETION_HANDLER));
+ }
+
+ public void writeBuffer(ByteBuffer buf, CompletionHandler handler) {
+ boolean thisThreadShouldWrite = false;
+
+ synchronized (monitor) {
+ if (state == State.FINISHED_OR_ERROR) {
+ executor.execute(() -> handler.failed(new IllegalStateException("ContentChannel already closed.")));
+ return;
+ }
+ responseContentQueue.addLast(new ResponseContentPart(buf, handler));
+ switch (state) {
+ case NOT_STARTED:
+ state = State.WAITING_FOR_WRITE_POSSIBLE_CALLBACK;
+ outputStream.setWriteListener(writeListener);
+ break;
+ case WAITING_FOR_WRITE_POSSIBLE_CALLBACK:
+ case WRITING_BUFFERS:
+ break;
+ case WAITING_FOR_BUFFER:
+ thisThreadShouldWrite = true;
+ state = State.WRITING_BUFFERS;
+ break;
+ default:
+ throw new IllegalStateException("Invalid state " + state);
+ }
+ }
+
+ if (thisThreadShouldWrite) {
+ writeBuffersInQueueToOutputStream();
+ }
+ }
+
+ public void close(CompletionHandler handler) {
+ writeBuffer(CLOSE_STREAM_BUFFER, handler);
+ }
+
+ public void close() {
+ close(NOOP_COMPLETION_HANDLER);
+ }
+
+ private void writeBuffersInQueueToOutputStream() {
+ boolean lastOperationWasFlush = false;
+
+ while (true) {
+ ResponseContentPart contentPart;
+
+ synchronized (monitor) {
+ if (state == State.FINISHED_OR_ERROR) {
+ return;
+ }
+ assertStateIs(state, State.WRITING_BUFFERS);
+
+ if (!outputStream.isReady()) {
+ state = State.WAITING_FOR_WRITE_POSSIBLE_CALLBACK;
+ return;
+ }
+
+ contentPart = responseContentQueue.pollFirst();
+
+ if (contentPart == null && lastOperationWasFlush) {
+ state = State.WAITING_FOR_BUFFER;
+ return;
+ }
+ }
+
+ try {
+ boolean isFlush = contentPart == null;
+ if (isFlush) {
+ outputStream.flush();
+ lastOperationWasFlush = true;
+ continue;
+ }
+ lastOperationWasFlush = false;
+
+ if (contentPart.buf == CLOSE_STREAM_BUFFER) {
+ callCompletionHandlerWhenDone(contentPart.handler, outputStream::close);
+ setFinished(Optional.empty());
+ return;
+ } else {
+ writeBufferToOutputStream(contentPart);
+ }
+ } catch (Throwable e) {
+ setFinished(Optional.of(e));
+ return;
+ }
+ }
+ }
+
+ private void setFinished(Optional<Throwable> e) {
+ synchronized (monitor) {
+ state = State.FINISHED_OR_ERROR;
+ if (!responseContentQueue.isEmpty()) {
+ failAllParts_holdingLock(e.orElse(new IllegalStateException("ContentChannel closed.")));
+ }
+ }
+
+ assert !Thread.holdsLock(monitor);
+ if (e.isPresent()) {
+ finishedFuture.completeExceptionally(e.get());
+ } else {
+ finishedFuture.complete(null);
+ }
+ }
+
+ private void failAllParts_holdingLock(Throwable e) {
+ assert Thread.holdsLock(monitor);
+
+ ArrayList<ResponseContentPart> failedParts = new ArrayList<>(responseContentQueue);
+ responseContentQueue.clear();
+
+ @SuppressWarnings("ThrowableInstanceNeverThrown")
+ RuntimeException failReason = new RuntimeException("Failing due to earlier ServletOutputStream write failure", e);
+
+ Consumer<ResponseContentPart> failCompletionHandler = responseContentPart ->
+ runCompletionHandler_logOnExceptions(
+ () -> responseContentPart.handler.failed(failReason));
+
+ executor.execute(
+ () -> failedParts.forEach(failCompletionHandler));
+ }
+
+ private void writeBufferToOutputStream(ResponseContentPart contentPart) throws Throwable {
+ callCompletionHandlerWhenDone(contentPart.handler, () -> {
+ ByteBuffer buffer = contentPart.buf;
+ final int bytesToSend = buffer.remaining();
+ try {
+ if (buffer.hasArray()) {
+ outputStream.write(buffer.array(), buffer.arrayOffset(), buffer.remaining());
+ } else {
+ final byte[] array = new byte[buffer.remaining()];
+ buffer.get(array);
+ outputStream.write(array);
+ }
+ metricReporter.successfulWrite(bytesToSend);
+ } catch (Throwable throwable) {
+ metricReporter.failedWrite();
+ throw throwable;
+ }
+ });
+ }
+
+ private static void callCompletionHandlerWhenDone(CompletionHandler handler, IORunnable runnable) throws Exception {
+ try {
+ runnable.run();
+ } catch (Throwable e) {
+ runCompletionHandler_logOnExceptions(() -> handler.failed(e));
+ throw e;
+ }
+ handler.completed(); //Might throw an exception, handling in the enclosing scope.
+ }
+
+ private static void runCompletionHandler_logOnExceptions(Runnable runnable) {
+ try {
+ runnable.run();
+ } catch (Throwable e) {
+ log.log(Level.WARNING, "Unexpected exception from CompletionHandler.", e);
+ }
+ }
+
+ private static void assertStateIs(State currentState, State expectedState) {
+ if (currentState != expectedState) {
+ AssertionError error = new AssertionError("Expected state " + expectedState + ", got state " + currentState);
+ log.log(Level.WARNING, "Assertion failed.", error);
+ throw error;
+ }
+ }
+
+ public void fail(Throwable t) {
+ setFinished(Optional.of(t));
+ }
+
+ private final WriteListener writeListener = new WriteListener() {
+ @Override
+ public void onWritePossible() throws IOException {
+ synchronized (monitor) {
+ if (state == State.FINISHED_OR_ERROR) {
+ return;
+ }
+
+ assertStateIs(state, State.WAITING_FOR_WRITE_POSSIBLE_CALLBACK);
+ state = State.WRITING_BUFFERS;
+ }
+
+ writeBuffersInQueueToOutputStream();
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ setFinished(Optional.of(t));
+ }
+ };
+
+ private static class ResponseContentPart {
+ public final ByteBuffer buf;
+ public final CompletionHandler handler;
+
+ public ResponseContentPart(ByteBuffer buf, CompletionHandler handler) {
+ this.buf = buf;
+ this.handler = handler;
+ }
+ }
+
+ @FunctionalInterface
+ private interface IORunnable {
+ void run() throws IOException;
+ }
+}