aboutsummaryrefslogtreecommitdiffstats
path: root/jdisc_core/src/main/java/com/yahoo/jdisc/test/ServerProviderConformanceTest.java
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@yahoo-inc.com>2016-06-15 23:09:44 +0200
committerJon Bratseth <bratseth@yahoo-inc.com>2016-06-15 23:09:44 +0200
commit72231250ed81e10d66bfe70701e64fa5fe50f712 (patch)
tree2728bba1131a6f6e5bdf95afec7d7ff9358dac50 /jdisc_core/src/main/java/com/yahoo/jdisc/test/ServerProviderConformanceTest.java
Publish
Diffstat (limited to 'jdisc_core/src/main/java/com/yahoo/jdisc/test/ServerProviderConformanceTest.java')
-rw-r--r--jdisc_core/src/main/java/com/yahoo/jdisc/test/ServerProviderConformanceTest.java3143
1 files changed, 3143 insertions, 0 deletions
diff --git a/jdisc_core/src/main/java/com/yahoo/jdisc/test/ServerProviderConformanceTest.java b/jdisc_core/src/main/java/com/yahoo/jdisc/test/ServerProviderConformanceTest.java
new file mode 100644
index 00000000000..ca52f3ab95b
--- /dev/null
+++ b/jdisc_core/src/main/java/com/yahoo/jdisc/test/ServerProviderConformanceTest.java
@@ -0,0 +1,3143 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jdisc.test;
+
+import com.google.common.annotations.Beta;
+import com.google.inject.AbstractModule;
+import com.google.inject.Key;
+import com.google.inject.Module;
+import com.google.inject.name.Names;
+import com.google.inject.util.Modules;
+import com.yahoo.jdisc.Request;
+import com.yahoo.jdisc.Response;
+import com.yahoo.jdisc.application.BindingSetSelector;
+import com.yahoo.jdisc.application.ContainerBuilder;
+import com.yahoo.jdisc.handler.AbstractRequestHandler;
+import com.yahoo.jdisc.handler.CompletionHandler;
+import com.yahoo.jdisc.handler.ContentChannel;
+import com.yahoo.jdisc.handler.ResponseHandler;
+import com.yahoo.jdisc.service.ServerProvider;
+
+import javax.annotation.CheckReturnValue;
+import java.io.Closeable;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Stream;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen Hult</a>
+ */
+@SuppressWarnings("UnusedDeclaration")
+@Beta
+public abstract class ServerProviderConformanceTest {
+ private static final int NUM_RUNS_EACH_TEST = 10;
+
+ /**
+ * <p>This interface declares the adapter between the general conformance test and an actual <tt>ServerProvider</tt>
+ * implementation. Every test runs as follows:</p>
+ * <ol>
+ * <li>{@link #newConfigModule()} is called to bind server-specific configuration.</li>
+ * <li>{@link #getServerProviderClass()} is called, and guice is asked to construct an instance of that class.</li>
+ * <li>{@link #newClient(ServerProvider)} is called one or more times as required by the test case.</li>
+ * <li>{@link #executeRequest(Object, boolean)} is called one or more times per client, as required by the test case.</li>
+ * <li>{@link #validateResponse(Object)} is called once per call to {@link #executeRequest(Object, boolean)}.</li>
+ * </ol>
+ *
+ * @param <T> The <tt>ServerProvider</tt> under test.
+ * @param <U> An object that represents a remote client that can connect to the server.
+ * @param <V> An object that holds the response generated by the client when executing a request.
+ */
+ public interface Adapter<T extends ServerProvider, U, V> {
+
+ Module newConfigModule();
+
+ Class<T> getServerProviderClass();
+
+ U newClient(T server) throws Throwable;
+
+ V executeRequest(U client, boolean withRequestContent) throws Throwable;
+
+ Iterable<ByteBuffer> newResponseContent();
+
+ void validateResponse(V response) throws Throwable;
+ }
+
+ /**
+ * <p>An instance of this exception is thrown within the conformance tests that imply that they will throw an
+ * exception. If your <tt>ServerProvider</tt> is capable of exposing such information, then this class is what you
+ * need to look for in the output.</p>
+ */
+ public static class ConformanceException extends RuntimeException {
+ private final Event peekEvent;
+
+ public ConformanceException() {
+ peekEvent = null;
+ }
+
+ /**
+ * In some tests, we want to ensure that a thrown exception has been handled by the framework before
+ * we do something else. There is no official hook to receive notification that the framework has
+ * handled an exception, but we assume (actually know) that the message of the exception will be
+ * accessed to create an error message. The provided event will signal that the exception
+ * has been _looked at_ by the framework, which we treat as synonymous with "handled" (due to
+ * synchronization in the framework, it is).
+ */
+ public ConformanceException(final Event peekEvent) {
+ this.peekEvent = peekEvent;
+ }
+
+ @Override
+ public String getMessage() {
+ if (peekEvent != null) {
+ peekEvent.happened();
+ }
+ return super.getMessage();
+ }
+ }
+
+ /* The following section declares and implements all test cases for the ServerProvider conformance test. When
+ * subclassing this test, you must implement these methods, annotate them as test methods and call runTest()
+ * from within each of them with an appropriate adapter instance.
+ *
+ * The test set up various scenarios with successes, failures and exceptions in different places and with
+ * different timing. There are many dimensions to test across, hence some really long method names. Some
+ * notes about the naming "scheme":
+ * - "testRequest<Something>" means the funky stuff happens in the handleRequest() method.
+ * - "testRequestContent<Something>" indicates that the funky stuff happens in the request content channel's code.
+ * - "testResponse<Something>" indicates that the funky stuff happens with the response content channel.
+ * - "Failure" means that failed() is called on some completion handler (the method name should indicate which).
+ * - "Nondeterministic" exception/failure means that it can occur before, during or after writing response content.
+ * The reason we include non-deterministic tests is that the deterministic ones involve synchronization, which
+ * may hide race conditions in the underlying processing. So we want some tests that "run free" as well.
+ * - "WithSync<Something>" means that anything NOT mentioned happens asynchronously, in a different thread.
+ * - "Before"/"After" is significant in some protocols; e.g. in http, status and headers are committed at one point.
+ * - "NoContent" refers to response content.
+ * There are quite likely possible scenarios that are not tested, but this is a good portion.
+ */
+
+ public abstract void testContainerNotReadyException() throws Throwable;
+ private <T extends ServerProvider, U, V> void testContainerNotReadyException(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.override(Modules.combine(config)).with(newActivateContainer(false)),
+ RequestType.WITHOUT_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(Request request, ResponseHandler handler) {
+ throw new AssertionError();
+ }
+ });
+ }
+
+ public abstract void testBindingSetNotFoundException() throws Throwable;
+ private <T extends ServerProvider, U, V> void testBindingSetNotFoundException(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.override(Modules.combine()).with(newBindingSetSelector("unknown")),
+ RequestType.WITHOUT_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(Request request, ResponseHandler handler) {
+ throw new AssertionError();
+ }
+ });
+ }
+
+ public abstract void testNoBindingSetSelectedException() throws Throwable;
+ private <T extends ServerProvider, U, V> void testNoBindingSetSelectedException(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.override(Modules.combine()).with(newBindingSetSelector(null)),
+ RequestType.WITHOUT_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(Request request, ResponseHandler handler) {
+ throw new AssertionError();
+ }
+ });
+ }
+
+ public abstract void testBindingNotFoundException() throws Throwable;
+ private <T extends ServerProvider, U, V> void testBindingNotFoundException(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.override(Modules.combine(config)).with(newServerBinding("not://found/")),
+ RequestType.WITHOUT_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(Request request, ResponseHandler handler) {
+ throw new AssertionError();
+ }
+ });
+ }
+
+ public abstract void testRequestHandlerWithSyncCloseResponse() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestHandlerWithSyncCloseResponse(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITHOUT_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
+ writeResponse(out);
+ closeResponse(out);
+ return null;
+ }
+ });
+ }
+
+ public abstract void testRequestHandlerWithSyncWriteResponse() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestHandlerWithSyncWriteResponse(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITHOUT_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
+ writeResponse(out);
+ closeResponseInOtherThread(out);
+ return null;
+ }
+ });
+ }
+
+ public abstract void testRequestHandlerWithSyncHandleResponse() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestHandlerWithSyncHandleResponse(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITHOUT_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
+ callInOtherThread(() -> {
+ writeResponse(out);
+ closeResponse(out);
+ return null;
+ });
+ return null;
+ }
+ });
+ }
+
+ public abstract void testRequestHandlerWithAsyncHandleResponse() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestHandlerWithAsyncHandleResponse(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITHOUT_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ respondWithContentInOtherThread(handler);
+ return null;
+ }
+ });
+ }
+
+ public abstract void testRequestException() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestException(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITHOUT_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(Request request, ResponseHandler handler) {
+ throw new ConformanceException();
+ }
+ });
+ }
+
+ public abstract void testRequestExceptionWithSyncCloseResponse() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestExceptionWithSyncCloseResponse(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITHOUT_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
+ writeResponse(out);
+ closeResponse(out);
+ throw new ConformanceException();
+ }
+ });
+ }
+
+ public abstract void testRequestExceptionWithSyncWriteResponse() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestExceptionWithSyncWriteResponse(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITHOUT_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
+ writeResponse(out);
+ closeResponseInOtherThread(out);
+ throw new ConformanceException();
+ }
+ });
+ }
+
+ public abstract void testRequestNondeterministicExceptionWithSyncHandleResponse() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestNondeterministicExceptionWithSyncHandleResponse(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITHOUT_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
+ callInOtherThread(() -> {
+ writeResponse(out);
+ closeResponse(out);
+ return null;
+ });
+ throw new ConformanceException();
+ }
+ });
+ }
+
+ public abstract void testRequestExceptionBeforeResponseWriteWithSyncHandleResponse() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestExceptionBeforeResponseWriteWithSyncHandleResponse(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITHOUT_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
+ final Event exceptionHandledByFramework = new Event();
+ callInOtherThread(() -> {
+ exceptionHandledByFramework.await();
+ writeResponse(out);
+ closeResponse(out);
+ return null;
+ });
+ throw new ConformanceException(exceptionHandledByFramework);
+ }
+ });
+ }
+
+ public abstract void testRequestExceptionAfterResponseWriteWithSyncHandleResponse() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestExceptionAfterResponseWriteWithSyncHandleResponse(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITHOUT_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
+ callInOtherThread(() -> {
+ writeResponse(out);
+ closeResponse(out);
+ return null;
+ });
+ responseWritten.await();
+ throw new ConformanceException();
+ }
+ });
+ }
+
+ public abstract void testRequestNondeterministicExceptionWithAsyncHandleResponse() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestNondeterministicExceptionWithAsyncHandleResponse(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITHOUT_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ callInOtherThread(new Callable<Void>() {
+
+ @Override
+ public Void call() throws Exception {
+ try {
+ final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
+ closeResponse(out);
+ } catch (Throwable ignored) {
+
+ }
+ return null;
+ }
+ });
+ throw new ConformanceException();
+ }
+ });
+ }
+
+ public abstract void testRequestExceptionBeforeResponseWriteWithAsyncHandleResponse() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestExceptionBeforeResponseWriteWithAsyncHandleResponse(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITHOUT_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ final Event exceptionHandledByFramework = new Event();
+ callInOtherThread(new Callable<Void>() {
+
+ @Override
+ public Void call() throws Exception {
+ exceptionHandledByFramework.await();
+ try {
+ final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
+ exceptionHandledByFramework.await();
+ writeResponse(out);
+ closeResponse(out);
+ } catch (Throwable ignored) {
+
+ }
+ return null;
+ }
+ });
+ throw new ConformanceException(exceptionHandledByFramework);
+ }
+ });
+ }
+
+ public abstract void testRequestExceptionAfterResponseCloseNoContentWithAsyncHandleResponse() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestExceptionAfterResponseCloseNoContentWithAsyncHandleResponse(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITHOUT_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ callInOtherThread(() -> {
+ try {
+ respondNoContent(handler);
+ } catch (Throwable ignored) {
+
+ }
+ return null;
+ });
+ responseClosed.await();
+ throw new ConformanceException();
+ }
+ });
+ }
+
+ public abstract void testRequestExceptionAfterResponseWriteWithAsyncHandleResponse() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestExceptionAfterResponseWriteWithAsyncHandleResponse(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITHOUT_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ callInOtherThread(new Callable<Void>() {
+
+ @Override
+ public Void call() throws Exception {
+ final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
+ writeResponse(out);
+ closeResponse(out);
+ return null;
+ }
+ });
+ responseWritten.await();
+ throw new ConformanceException();
+ }
+ });
+ }
+
+ public abstract void testRequestContentWriteWithSyncCompletion() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestContentWriteWithSyncCompletion(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITH_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ respondWithContentInOtherThread(handler);
+ return new ContentChannel() {
+
+ @Override
+ public void write(final ByteBuffer buf, final CompletionHandler handler) {
+ handler.completed();
+ }
+
+ @Override
+ public void close(final CompletionHandler handler) {
+ handler.completed();
+ }
+ };
+ }
+ });
+ }
+
+ public abstract void testRequestContentWriteWithAsyncCompletion() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestContentWriteWithAsyncCompletion(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITH_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ respondWithContentInOtherThread(handler);
+ return new ContentChannel() {
+
+ @Override
+ public void write(final ByteBuffer buf, final CompletionHandler handler) {
+ completeInOtherThread(handler);
+ }
+
+ @Override
+ public void close(final CompletionHandler handler) {
+ handler.completed();
+ }
+ };
+ }
+ });
+ }
+
+ public abstract void testRequestContentWriteWithNondeterministicSyncFailure() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestContentWriteWithNondeterministicSyncFailure(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITH_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
+ callInOtherThread(() -> {
+ writeResponse(out);
+ closeResponse(out);
+ return null;
+ });
+
+ return new ContentChannel() {
+
+ @Override
+ public void write(final ByteBuffer buf, final CompletionHandler handler) {
+ handler.failed(new ConformanceException());
+ }
+
+ @Override
+ public void close(final CompletionHandler handler) {
+ handler.completed();
+ }
+ };
+ }
+ });
+ }
+
+ public abstract void testRequestContentWriteWithSyncFailureBeforeResponseWrite() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestContentWriteWithSyncFailureBeforeResponseWrite(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITH_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
+ final Event failDone = new Event();
+ callInOtherThread(() -> {
+ failDone.await();
+ writeResponse(out);
+ closeResponse(out);
+ return null;
+ });
+
+ return new ContentChannel() {
+
+ @Override
+ public void write(final ByteBuffer buf, final CompletionHandler handler) {
+ try {
+ handler.failed(new ConformanceException());
+ } finally {
+ failDone.happened();
+ }
+ }
+
+ @Override
+ public void close(final CompletionHandler handler) {
+ handler.completed();
+ }
+ };
+ }
+ });
+ }
+
+ public abstract void testRequestContentWriteWithSyncFailureAfterResponseWrite() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestContentWriteWithSyncFailureAfterResponseWrite(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITH_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
+ callInOtherThread(() -> {
+ writeResponse(out);
+ closeResponse(out);
+ return null;
+ });
+
+ return new ContentChannel() {
+
+ @Override
+ public void write(final ByteBuffer buf, final CompletionHandler handler) {
+ responseWritten.await();
+ handler.failed(new ConformanceException());
+ }
+
+ @Override
+ public void close(final CompletionHandler handler) {
+ handler.completed();
+ }
+ };
+ }
+ });
+ }
+
+ public abstract void testRequestContentWriteWithNondeterministicAsyncFailure() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestContentWriteWithNondeterministicAsyncFailure(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITH_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
+ callInOtherThread(() -> {
+ writeResponse(out);
+ closeResponse(out);
+ return null;
+ });
+
+ return new ContentChannel() {
+
+ @Override
+ public void write(final ByteBuffer buf, final CompletionHandler handler) {
+ callInOtherThread(() -> {
+ handler.failed(new ConformanceException());
+ return null;
+ });
+ }
+
+ @Override
+ public void close(final CompletionHandler handler) {
+ handler.completed();
+ }
+ };
+ }
+ });
+ }
+
+ public abstract void testRequestContentWriteWithAsyncFailureBeforeResponseWrite() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestContentWriteWithAsyncFailureBeforeResponseWrite(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITH_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
+ final Event failDone = new Event();
+ callInOtherThread(() -> {
+ failDone.await();
+ writeResponse(out);
+ closeResponse(out);
+ return null;
+ });
+
+ return new ContentChannel() {
+
+ @Override
+ public void write(final ByteBuffer buf, final CompletionHandler handler) {
+ callInOtherThread(() -> {
+ try {
+ handler.failed(new ConformanceException());
+ } finally {
+ failDone.happened();
+ }
+ return null;
+ });
+ }
+
+ @Override
+ public void close(final CompletionHandler handler) {
+ handler.completed();
+ }
+ };
+ }
+ });
+ }
+
+ public abstract void testRequestContentWriteWithAsyncFailureAfterResponseWrite() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestContentWriteWithAsyncFailureAfterResponseWrite(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITH_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
+ callInOtherThread(() -> {
+ writeResponse(out);
+ closeResponse(out);
+ return null;
+ });
+
+ return new ContentChannel() {
+
+ @Override
+ public void write(final ByteBuffer buf, final CompletionHandler handler) {
+ callInOtherThread(() -> {
+ responseWritten.await();
+ handler.failed(new ConformanceException());
+ return null;
+ });
+ }
+
+ @Override
+ public void close(final CompletionHandler handler) {
+ handler.completed();
+ }
+ };
+ }
+ });
+ }
+
+ public abstract void testRequestContentWriteWithAsyncFailureAfterResponseCloseNoContent() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestContentWriteWithAsyncFailureAfterResponseCloseNoContent(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITH_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
+ callInOtherThread(() -> {
+ closeResponse(out);
+ return null;
+ });
+
+ return new ContentChannel() {
+
+ @Override
+ public void write(final ByteBuffer buf, final CompletionHandler handler) {
+ callInOtherThread(() -> {
+ responseClosed.await();
+ handler.failed(new ConformanceException());
+ return null;
+ });
+ }
+
+ @Override
+ public void close(final CompletionHandler handler) {
+ handler.completed();
+ }
+ };
+ }
+ });
+ }
+
+ public abstract void testRequestContentWriteNondeterministicException() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestContentWriteNondeterministicException(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITH_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
+ callInOtherThread(() -> {
+ writeResponse(out);
+ closeResponse(out);
+ return null;
+ });
+
+ return new ContentChannel() {
+
+ @Override
+ public void write(final ByteBuffer buf, final CompletionHandler handler) {
+ throw new ConformanceException();
+ }
+
+ @Override
+ public void close(final CompletionHandler handler) {
+ handler.completed();
+ }
+ };
+ }
+ });
+ }
+
+ public abstract void testRequestContentWriteExceptionBeforeResponseWrite() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestContentWriteExceptionBeforeResponseWrite(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITH_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
+ final Event exceptionHandledByFramework = new Event();
+ callInOtherThread(() -> {
+ exceptionHandledByFramework.await();
+ writeResponse(out);
+ closeResponse(out);
+ return null;
+ });
+
+ return new ContentChannel() {
+
+ @Override
+ public void write(final ByteBuffer buf, final CompletionHandler handler) {
+ throw new ConformanceException(exceptionHandledByFramework);
+ }
+
+ @Override
+ public void close(final CompletionHandler handler) {
+ handler.completed();
+ }
+ };
+ }
+ });
+ }
+
+ public abstract void testRequestContentWriteExceptionAfterResponseWrite() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestContentWriteExceptionAfterResponseWrite(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITH_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
+ callInOtherThread(() -> {
+ writeResponse(out);
+ closeResponse(out);
+ return null;
+ });
+
+ return new ContentChannel() {
+
+ @Override
+ public void write(final ByteBuffer buf, final CompletionHandler handler) {
+ responseWritten.await();
+ throw new ConformanceException();
+ }
+
+ @Override
+ public void close(final CompletionHandler handler) {
+ handler.completed();
+ }
+ };
+ }
+ });
+ }
+
+ public abstract void testRequestContentWriteExceptionAfterResponseCloseNoContent() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestContentWriteExceptionAfterResponseCloseNoContent(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITH_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
+ callInOtherThread(() -> {
+ closeResponse(out);
+ return null;
+ });
+
+ return new ContentChannel() {
+
+ @Override
+ public void write(final ByteBuffer buf, final CompletionHandler handler) {
+ responseClosed.await();
+ throw new ConformanceException();
+ }
+
+ @Override
+ public void close(final CompletionHandler handler) {
+ handler.completed();
+ }
+ };
+ }
+ });
+ }
+
+ public abstract void testRequestContentWriteNondeterministicExceptionWithSyncCompletion() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestContentWriteNondeterministicExceptionWithSyncCompletion(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITH_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
+ callInOtherThread(() -> {
+ writeResponse(out);
+ closeResponse(out);
+ return null;
+ });
+
+ return new ContentChannel() {
+
+ @Override
+ public void write(final ByteBuffer buf, final CompletionHandler handler) {
+ handler.completed();
+ throw new ConformanceException();
+ }
+
+ @Override
+ public void close(final CompletionHandler handler) {
+ handler.completed();
+ }
+ };
+ }
+ });
+ }
+
+ public abstract void testRequestContentWriteExceptionBeforeResponseWriteWithSyncCompletion() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestContentWriteExceptionBeforeResponseWriteWithSyncCompletion(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITH_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
+ final Event exceptionHandledByFramework = new Event();
+ callInOtherThread(() -> {
+ exceptionHandledByFramework.await();
+ writeResponse(out);
+ closeResponse(out);
+ return null;
+ });
+
+ return new ContentChannel() {
+
+ @Override
+ public void write(final ByteBuffer buf, final CompletionHandler handler) {
+ handler.completed();
+ throw new ConformanceException(exceptionHandledByFramework);
+ }
+
+ @Override
+ public void close(final CompletionHandler handler) {
+ handler.completed();
+ }
+ };
+ }
+ });
+ }
+
+ public abstract void testRequestContentWriteExceptionAfterResponseWriteWithSyncCompletion() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestContentWriteExceptionAfterResponseWriteWithSyncCompletion(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITH_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
+ callInOtherThread(() -> {
+ writeResponse(out);
+ closeResponse(out);
+ return null;
+ });
+
+ return new ContentChannel() {
+
+ @Override
+ public void write(final ByteBuffer buf, final CompletionHandler handler) {
+ handler.completed();
+ responseWritten.await();
+ throw new ConformanceException();
+ }
+
+ @Override
+ public void close(final CompletionHandler handler) {
+ handler.completed();
+ }
+ };
+ }
+ });
+ }
+
+ public abstract void testRequestContentWriteExceptionAfterResponseCloseNoContentWithSyncCompletion() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestContentWriteExceptionAfterResponseCloseNoContentWithSyncCompletion(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITH_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
+ callInOtherThread(() -> {
+ closeResponse(out);
+ return null;
+ });
+
+ return new ContentChannel() {
+
+ @Override
+ public void write(final ByteBuffer buf, final CompletionHandler handler) {
+ handler.completed();
+ responseClosed.await();
+ throw new ConformanceException();
+ }
+
+ @Override
+ public void close(final CompletionHandler handler) {
+ handler.completed();
+ }
+ };
+ }
+ });
+ }
+
+ public abstract void testRequestContentWriteNondeterministicExceptionWithAsyncCompletion() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestContentWriteNondeterministicExceptionWithAsyncCompletion(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITH_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
+ callInOtherThread(() -> {
+ writeResponse(out);
+ closeResponse(out);
+ return null;
+ });
+
+ return new ContentChannel() {
+
+ @Override
+ public void write(final ByteBuffer buf, final CompletionHandler handler) {
+ completeInOtherThread(handler, IllegalStateException.class);
+ throw new ConformanceException();
+ }
+
+ @Override
+ public void close(final CompletionHandler handler) {
+ handler.completed();
+ }
+ };
+ }
+ });
+ }
+
+ public abstract void testRequestContentWriteExceptionBeforeResponseWriteWithAsyncCompletion() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestContentWriteExceptionBeforeResponseWriteWithAsyncCompletion(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITH_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
+ final Event exceptionHandledByFramework = new Event();
+ callInOtherThread(() -> {
+ exceptionHandledByFramework.await();
+ writeResponse(out);
+ closeResponse(out);
+ return null;
+ });
+
+ return new ContentChannel() {
+
+ @Override
+ public void write(final ByteBuffer buf, final CompletionHandler handler) {
+ completeInOtherThread(handler, IllegalStateException.class);
+ throw new ConformanceException(exceptionHandledByFramework);
+ }
+
+ @Override
+ public void close(final CompletionHandler handler) {
+ handler.completed();
+ }
+ };
+ }
+ });
+ }
+
+ public abstract void testRequestContentWriteExceptionAfterResponseWriteWithAsyncCompletion() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestContentWriteExceptionAfterResponseWriteWithAsyncCompletion(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITH_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
+ callInOtherThread(() -> {
+ writeResponse(out);
+ closeResponse(out);
+ return null;
+ });
+
+ return new ContentChannel() {
+
+ @Override
+ public void write(final ByteBuffer buf, final CompletionHandler handler) {
+ completeInOtherThread(handler, IllegalStateException.class);
+ responseWritten.await();
+ throw new ConformanceException();
+ }
+
+ @Override
+ public void close(final CompletionHandler handler) {
+ handler.completed();
+ }
+ };
+ }
+ });
+ }
+
+ public abstract void testRequestContentWriteExceptionAfterResponseCloseNoContentWithAsyncCompletion() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestContentWriteExceptionAfterResponseCloseNoContentWithAsyncCompletion(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITH_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
+ callInOtherThread(() -> {
+ closeResponse(out);
+ return null;
+ });
+
+ return new ContentChannel() {
+
+ @Override
+ public void write(final ByteBuffer buf, final CompletionHandler handler) {
+ completeInOtherThread(handler, IllegalStateException.class);
+ responseClosed.await();
+ throw new ConformanceException();
+ }
+
+ @Override
+ public void close(final CompletionHandler handler) {
+ handler.completed();
+ }
+ };
+ }
+ });
+ }
+
+ public abstract void testRequestContentWriteExceptionWithNondeterministicSyncFailure() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestContentWriteExceptionWithNondeterministicSyncFailure(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITH_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
+ callInOtherThread(() -> {
+ writeResponse(out);
+ closeResponse(out);
+ return null;
+ });
+
+ return new ContentChannel() {
+
+ @Override
+ public void write(final ByteBuffer buf, final CompletionHandler handler) {
+ handler.failed(new ConformanceException());
+ throw new ConformanceException();
+ }
+
+ @Override
+ public void close(final CompletionHandler handler) {
+ handler.completed();
+ }
+ };
+ }
+ });
+ }
+
+ public abstract void testRequestContentWriteExceptionWithSyncFailureBeforeResponseWrite() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestContentWriteExceptionWithSyncFailureBeforeResponseWrite(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITH_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
+ final Event failDone = new Event();
+ callInOtherThread(() -> {
+ failDone.await();
+ writeResponse(out);
+ closeResponse(out);
+ return null;
+ });
+
+ return new ContentChannel() {
+
+ @Override
+ public void write(final ByteBuffer buf, final CompletionHandler handler) {
+ try {
+ handler.failed(new ConformanceException());
+ } finally {
+ failDone.happened();
+ }
+ throw new ConformanceException();
+ }
+
+ @Override
+ public void close(final CompletionHandler handler) {
+ handler.completed();
+ }
+ };
+ }
+ });
+ }
+
+ public abstract void testRequestContentWriteExceptionWithSyncFailureAfterResponseWrite() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestContentWriteExceptionWithSyncFailureAfterResponseWrite(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITH_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
+ callInOtherThread(() -> {
+ writeResponse(out);
+ closeResponse(out);
+ return null;
+ });
+
+ return new ContentChannel() {
+
+ @Override
+ public void write(final ByteBuffer buf, final CompletionHandler handler) {
+ responseWritten.await();
+ handler.failed(new ConformanceException());
+ throw new ConformanceException();
+ }
+
+ @Override
+ public void close(final CompletionHandler handler) {
+ handler.completed();
+ }
+ };
+ }
+ });
+ }
+
+ public abstract void testRequestContentWriteExceptionWithSyncFailureAfterResponseCloseNoContent() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestContentWriteExceptionWithSyncFailureAfterResponseCloseNoContent(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITH_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
+ callInOtherThread(() -> {
+ closeResponse(out);
+ return null;
+ });
+
+ return new ContentChannel() {
+
+ @Override
+ public void write(final ByteBuffer buf, final CompletionHandler handler) {
+ responseClosed.await();
+ handler.failed(new ConformanceException());
+ throw new ConformanceException();
+ }
+
+ @Override
+ public void close(final CompletionHandler handler) {
+ handler.completed();
+ }
+ };
+ }
+ });
+ }
+
+ public abstract void testRequestContentWriteExceptionWithNondeterministicAsyncFailure() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestContentWriteExceptionWithNondeterministicAsyncFailure(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITH_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
+ callInOtherThread(() -> {
+ writeResponse(out);
+ closeResponse(out);
+ return null;
+ });
+
+ return new ContentChannel() {
+
+ @Override
+ public void write(final ByteBuffer buf, final CompletionHandler handler) {
+ callInOtherThread(() -> {
+ fail(handler, new ConformanceException(), IllegalStateException.class);
+ return null;
+ });
+ throw new ConformanceException();
+ }
+
+ @Override
+ public void close(final CompletionHandler handler) {
+ handler.completed();
+ }
+ };
+ }
+ });
+ }
+
+ public abstract void testRequestContentWriteExceptionWithAsyncFailureBeforeResponseWrite() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestContentWriteExceptionWithAsyncFailureBeforeResponseWrite(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITH_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
+ final Event exceptionHandled = new Event();
+
+ callInOtherThread(() -> {
+ exceptionHandled.await();
+ writeResponse(out);
+ closeResponse(out);
+ return null;
+ });
+
+ return new ContentChannel() {
+
+ @Override
+ public void write(final ByteBuffer buf, final CompletionHandler handler) {
+ callInOtherThread(() -> {
+ fail(handler, new ConformanceException(exceptionHandled), IllegalStateException.class);
+ return null;
+ });
+ throw new ConformanceException(exceptionHandled);
+ }
+
+ @Override
+ public void close(final CompletionHandler handler) {
+ handler.completed();
+ }
+ };
+ }
+ });
+ }
+
+ public abstract void testRequestContentWriteExceptionWithAsyncFailureAfterResponseWrite() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestContentWriteExceptionWithAsyncFailureAfterResponseWrite(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITH_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
+ callInOtherThread(() -> {
+ writeResponse(out);
+ closeResponse(out);
+ return null;
+ });
+
+ return new ContentChannel() {
+
+ @Override
+ public void write(final ByteBuffer buf, final CompletionHandler handler) {
+ callInOtherThread(() -> {
+ responseWritten.await();
+ fail(handler, new ConformanceException(), IllegalStateException.class);
+ return null;
+ });
+ responseWritten.await();
+ throw new ConformanceException();
+ }
+
+ @Override
+ public void close(final CompletionHandler handler) {
+ handler.completed();
+ }
+ };
+ }
+ });
+ }
+
+ public abstract void testRequestContentWriteExceptionWithAsyncFailureAfterResponseCloseNoContent() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestContentWriteExceptionWithAsyncFailureAfterResponseCloseNoContent(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITH_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
+ callInOtherThread(() -> {
+ closeResponse(out);
+ return null;
+ });
+
+ return new ContentChannel() {
+
+ @Override
+ public void write(final ByteBuffer buf, final CompletionHandler handler) {
+ callInOtherThread(() -> {
+ responseClosed.await();
+ fail(handler, new ConformanceException(), IllegalStateException.class);
+ return null;
+ });
+ responseClosed.await();
+ throw new ConformanceException();
+ }
+
+ @Override
+ public void close(final CompletionHandler handler) {
+ handler.completed();
+ }
+ };
+ }
+ });
+ }
+
+ public abstract void testRequestContentCloseWithSyncCompletion() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestContentCloseWithSyncCompletion(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITHOUT_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ respondWithContentInOtherThread(handler);
+ return new ContentChannel() {
+
+ @Override
+ public void write(final ByteBuffer buf, final CompletionHandler handler) {
+ handler.completed();
+ }
+
+ @Override
+ public void close(final CompletionHandler handler) {
+ handler.completed();
+ }
+ };
+ }
+ });
+ }
+
+ public abstract void testRequestContentCloseWithAsyncCompletion() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestContentCloseWithAsyncCompletion(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITHOUT_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ respondWithContentInOtherThread(handler);
+ return new ContentChannel() {
+
+ @Override
+ public void write(final ByteBuffer buf, final CompletionHandler handler) {
+ handler.completed();
+ }
+
+ @Override
+ public void close(final CompletionHandler handler) {
+ completeInOtherThread(handler);
+ }
+ };
+ }
+ });
+ }
+
+ public abstract void testRequestContentCloseWithNondeterministicSyncFailure() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestContentCloseWithNondeterministicSyncFailure(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITHOUT_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
+ callInOtherThread(() -> {
+ writeResponse(out);
+ closeResponse(out);
+ return null;
+ });
+
+ return new ContentChannel() {
+
+ @Override
+ public void write(final ByteBuffer buf, final CompletionHandler handler) {
+ handler.completed();
+ }
+
+ @Override
+ public void close(final CompletionHandler handler) {
+ handler.failed(new ConformanceException());
+ }
+ };
+ }
+ });
+ }
+
+ public abstract void testRequestContentCloseWithSyncFailureBeforeResponseWrite() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestContentCloseWithSyncFailureBeforeResponseWrite(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITHOUT_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
+ final Event failDone = new Event();
+ callInOtherThread(() -> {
+ failDone.await();
+ writeResponse(out);
+ closeResponse(out);
+ return null;
+ });
+
+ return new ContentChannel() {
+
+ @Override
+ public void write(final ByteBuffer buf, final CompletionHandler handler) {
+ handler.completed();
+ }
+
+ @Override
+ public void close(final CompletionHandler handler) {
+ try {
+ handler.failed(new ConformanceException());
+ } finally {
+ failDone.happened();
+ }
+ }
+ };
+ }
+ });
+ }
+
+ public abstract void testRequestContentCloseWithSyncFailureAfterResponseWrite() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestContentCloseWithSyncFailureAfterResponseWrite(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITHOUT_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
+ callInOtherThread(() -> {
+ writeResponse(out);
+ closeResponse(out);
+ return null;
+ });
+
+ return new ContentChannel() {
+
+ @Override
+ public void write(final ByteBuffer buf, final CompletionHandler handler) {
+ handler.completed();
+ }
+
+ @Override
+ public void close(final CompletionHandler handler) {
+ responseWritten.await();
+ handler.failed(new ConformanceException());
+ }
+ };
+ }
+ });
+ }
+
+ public abstract void testRequestContentCloseWithSyncFailureAfterResponseCloseNoContent() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestContentCloseWithSyncFailureAfterResponseCloseNoContent(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITHOUT_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
+ callInOtherThread(() -> {
+ closeResponse(out);
+ return null;
+ });
+
+ return new ContentChannel() {
+
+ @Override
+ public void write(final ByteBuffer buf, final CompletionHandler handler) {
+ handler.completed();
+ }
+
+ @Override
+ public void close(final CompletionHandler handler) {
+ responseClosed.await();
+ handler.failed(new ConformanceException());
+ }
+ };
+ }
+ });
+ }
+
+ public abstract void testRequestContentCloseWithNondeterministicAsyncFailure() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestContentCloseWithNondeterministicAsyncFailure(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITHOUT_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ respondWithContentInOtherThread(handler);
+ return new ContentChannel() {
+
+ @Override
+ public void write(final ByteBuffer buf, final CompletionHandler handler) {
+ handler.completed();
+ }
+
+ @Override
+ public void close(final CompletionHandler handler) {
+ failInOtherThread(handler, new ConformanceException());
+ }
+ };
+ }
+ });
+ }
+
+ public abstract void testRequestContentCloseWithAsyncFailureBeforeResponseWrite() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestContentCloseWithAsyncFailureBeforeResponseWrite(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITHOUT_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ final Event failDone = new Event();
+ callInOtherThread(() -> {
+ failDone.await();
+ respondWithContent(handler);
+ return null;
+ });
+ return new ContentChannel() {
+
+ @Override
+ public void write(final ByteBuffer buf, final CompletionHandler handler) {
+ handler.completed();
+ }
+
+ @Override
+ public void close(final CompletionHandler handler) {
+ callInOtherThread(() -> {
+ try {
+ fail(handler, new ConformanceException());
+ } finally {
+ failDone.happened();
+ }
+ return null;
+ });
+ }
+ };
+ }
+ });
+ }
+
+ public abstract void testRequestContentCloseWithAsyncFailureAfterResponseWrite() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestContentCloseWithAsyncFailureAfterResponseWrite(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITHOUT_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ callInOtherThread(() -> {
+ respondWithContent(handler);
+ return null;
+ });
+ return new ContentChannel() {
+
+ @Override
+ public void write(final ByteBuffer buf, final CompletionHandler handler) {
+ handler.completed();
+ }
+
+ @Override
+ public void close(final CompletionHandler handler) {
+ callInOtherThread(() -> {
+ responseWritten.await();
+ fail(handler, new ConformanceException());
+ return null;
+ });
+ }
+ };
+ }
+ });
+ }
+
+ public abstract void testRequestContentCloseWithAsyncFailureAfterResponseCloseNoContent() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestContentCloseWithAsyncFailureAfterResponseCloseNoContent(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITHOUT_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ callInOtherThread(() -> {
+ respondNoContent(handler);
+ return null;
+ });
+ return new ContentChannel() {
+
+ @Override
+ public void write(final ByteBuffer buf, final CompletionHandler handler) {
+ handler.completed();
+ }
+
+ @Override
+ public void close(final CompletionHandler handler) {
+ callInOtherThread(() -> {
+ responseClosed.await();
+ fail(handler, new ConformanceException());
+ return null;
+ });
+ }
+ };
+ }
+ });
+ }
+
+ public abstract void testRequestContentCloseNondeterministicException() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestContentCloseNondeterministicException(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITHOUT_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
+ callInOtherThread(() -> {
+ writeResponse(out);
+ closeResponse(out);
+ return null;
+ });
+
+ return new ContentChannel() {
+
+ @Override
+ public void write(final ByteBuffer buf, final CompletionHandler handler) {
+ handler.completed();
+ }
+
+ @Override
+ public void close(final CompletionHandler handler) {
+ throw new ConformanceException();
+ }
+ };
+ }
+ });
+ }
+
+ public abstract void testRequestContentCloseExceptionBeforeResponseWrite() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestContentCloseExceptionBeforeResponseWrite(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITHOUT_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
+ final Event exceptionHandledByFramework = new Event();
+ callInOtherThread(() -> {
+ exceptionHandledByFramework.await();
+ writeResponse(out);
+ closeResponse(out);
+ return null;
+ });
+
+ return new ContentChannel() {
+
+ @Override
+ public void write(final ByteBuffer buf, final CompletionHandler handler) {
+ handler.completed();
+ }
+
+ @Override
+ public void close(final CompletionHandler handler) {
+ throw new ConformanceException(exceptionHandledByFramework);
+ }
+ };
+ }
+ });
+ }
+
+ public abstract void testRequestContentCloseExceptionAfterResponseWrite() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestContentCloseExceptionAfterResponseWrite(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITHOUT_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
+ callInOtherThread(() -> {
+ writeResponse(out);
+ closeResponse(out);
+ return null;
+ });
+
+ return new ContentChannel() {
+
+ @Override
+ public void write(final ByteBuffer buf, final CompletionHandler handler) {
+ handler.completed();
+ }
+
+ @Override
+ public void close(final CompletionHandler handler) {
+ responseWritten.await();
+ throw new ConformanceException();
+ }
+ };
+ }
+ });
+ }
+
+ public abstract void testRequestContentCloseExceptionAfterResponseCloseNoContent() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestContentCloseExceptionAfterResponseCloseNoContent(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITHOUT_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
+ callInOtherThread(() -> {
+ closeResponse(out);
+ return null;
+ });
+
+ return new ContentChannel() {
+
+ @Override
+ public void write(final ByteBuffer buf, final CompletionHandler handler) {
+ handler.completed();
+ }
+
+ @Override
+ public void close(final CompletionHandler handler) {
+ responseClosed.await();
+ throw new ConformanceException();
+ }
+ };
+ }
+ });
+ }
+
+ public abstract void testRequestContentCloseNondeterministicExceptionWithSyncCompletion() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestContentCloseNondeterministicExceptionWithSyncCompletion(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITHOUT_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ respondWithContentInOtherThread(handler);
+ return new ContentChannel() {
+
+ @Override
+ public void write(final ByteBuffer buf, final CompletionHandler handler) {
+ handler.completed();
+ }
+
+ @Override
+ public void close(final CompletionHandler handler) {
+ handler.completed();
+ throw new ConformanceException();
+ }
+ };
+ }
+ });
+ }
+
+ public abstract void testRequestContentCloseExceptionBeforeResponseWriteWithSyncCompletion() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestContentCloseExceptionBeforeResponseWriteWithSyncCompletion(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITHOUT_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ final Event exceptionHandledByFramework = new Event();
+ callInOtherThread(() -> {
+ exceptionHandledByFramework.await();
+ respondWithContent(handler);
+ return null;
+ });
+ return new ContentChannel() {
+
+ @Override
+ public void write(final ByteBuffer buf, final CompletionHandler handler) {
+ handler.completed();
+ }
+
+ @Override
+ public void close(final CompletionHandler handler) {
+ handler.completed();
+ throw new ConformanceException(exceptionHandledByFramework);
+ }
+ };
+ }
+ });
+ }
+
+ public abstract void testRequestContentCloseExceptionAfterResponseWriteWithSyncCompletion() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestContentCloseExceptionAfterResponseWriteWithSyncCompletion(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITHOUT_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ callInOtherThread(() -> {
+ respondWithContent(handler);
+ return null;
+ });
+ return new ContentChannel() {
+
+ @Override
+ public void write(final ByteBuffer buf, final CompletionHandler handler) {
+ handler.completed();
+ }
+
+ @Override
+ public void close(final CompletionHandler handler) {
+ handler.completed();
+ responseWritten.await();
+ throw new ConformanceException();
+ }
+ };
+ }
+ });
+ }
+
+ public abstract void testRequestContentCloseExceptionAfterResponseCloseNoContentWithSyncCompletion() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestContentCloseExceptionAfterResponseCloseNoContentWithSyncCompletion(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITHOUT_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ callInOtherThread(() -> {
+ respondNoContent(handler);
+ return null;
+ });
+ return new ContentChannel() {
+
+ @Override
+ public void write(final ByteBuffer buf, final CompletionHandler handler) {
+ handler.completed();
+ }
+
+ @Override
+ public void close(final CompletionHandler handler) {
+ handler.completed();
+ responseClosed.await();
+ throw new ConformanceException();
+ }
+ };
+ }
+ });
+ }
+
+ public abstract void testRequestContentCloseNondeterministicExceptionWithAsyncCompletion() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestContentCloseNondeterministicExceptionWithAsyncCompletion(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITHOUT_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ respondWithContentInOtherThread(handler);
+ return new ContentChannel() {
+
+ @Override
+ public void write(final ByteBuffer buf, final CompletionHandler handler) {
+ handler.completed();
+ }
+
+ @Override
+ public void close(final CompletionHandler handler) {
+ completeInOtherThread(handler, IllegalStateException.class);
+ throw new ConformanceException();
+ }
+ };
+ }
+ });
+ }
+
+ public abstract void testRequestContentCloseExceptionBeforeResponseWriteWithAsyncCompletion() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestContentCloseExceptionBeforeResponseWriteWithAsyncCompletion(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITHOUT_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ final Event exceptionHandledByFramework = new Event();
+ callInOtherThread(() -> {
+ exceptionHandledByFramework.await();
+ respondWithContent(handler);
+ return null;
+ });
+ return new ContentChannel() {
+
+ @Override
+ public void write(final ByteBuffer buf, final CompletionHandler handler) {
+ handler.completed();
+ }
+
+ @Override
+ public void close(final CompletionHandler handler) {
+ completeInOtherThread(handler, IllegalStateException.class);
+ throw new ConformanceException(exceptionHandledByFramework);
+ }
+ };
+ }
+ });
+ }
+
+ public abstract void testRequestContentCloseExceptionAfterResponseWriteWithAsyncCompletion() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestContentCloseExceptionAfterResponseWriteWithAsyncCompletion(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITHOUT_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ respondWithContentInOtherThread(handler);
+ return new ContentChannel() {
+
+ @Override
+ public void write(final ByteBuffer buf, final CompletionHandler handler) {
+ handler.completed();
+ }
+
+ @Override
+ public void close(final CompletionHandler handler) {
+ completeInOtherThread(handler, IllegalStateException.class);
+ responseWritten.await();
+ throw new ConformanceException();
+ }
+ };
+ }
+ });
+ }
+
+ public abstract void testRequestContentCloseExceptionAfterResponseCloseNoContentWithAsyncCompletion() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestContentCloseExceptionAfterResponseCloseNoContentWithAsyncCompletion(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITHOUT_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ respondNoContentInOtherThread(handler);
+ return new ContentChannel() {
+
+ @Override
+ public void write(final ByteBuffer buf, final CompletionHandler handler) {
+ handler.completed();
+ }
+
+ @Override
+ public void close(final CompletionHandler handler) {
+ completeInOtherThread(handler, IllegalStateException.class);
+ responseClosed.await();
+ throw new ConformanceException();
+ }
+ };
+ }
+ });
+ }
+
+ public abstract void testRequestContentCloseNondeterministicExceptionWithSyncFailure() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestContentCloseNondeterministicExceptionWithSyncFailure(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITHOUT_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
+ callInOtherThread(() -> {
+ writeResponse(out);
+ closeResponse(out);
+ return null;
+ });
+
+ return new ContentChannel() {
+
+ @Override
+ public void write(final ByteBuffer buf, final CompletionHandler handler) {
+ handler.completed();
+ }
+
+ @Override
+ public void close(final CompletionHandler handler) {
+ handler.failed(new ConformanceException());
+ throw new ConformanceException();
+ }
+ };
+ }
+ });
+ }
+
+ public abstract void testRequestContentCloseExceptionBeforeResponseWriteWithSyncFailure() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestContentCloseExceptionBeforeResponseWriteWithSyncFailure(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITHOUT_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
+ final Event failDone = new Event();
+ callInOtherThread(() -> {
+ failDone.await();
+ writeResponse(out);
+ closeResponse(out);
+ return null;
+ });
+
+ return new ContentChannel() {
+
+ @Override
+ public void write(final ByteBuffer buf, final CompletionHandler handler) {
+ handler.completed();
+ }
+
+ @Override
+ public void close(final CompletionHandler handler) {
+ try {
+ handler.failed(new ConformanceException());
+ } finally {
+ failDone.happened();
+ }
+ throw new ConformanceException();
+ }
+ };
+ }
+ });
+ }
+
+ public abstract void testRequestContentCloseExceptionAfterResponseWriteWithSyncFailure() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestContentCloseExceptionAfterResponseWriteWithSyncFailure(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITHOUT_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
+ callInOtherThread(() -> {
+ writeResponse(out);
+ closeResponse(out);
+ return null;
+ });
+
+ return new ContentChannel() {
+
+ @Override
+ public void write(final ByteBuffer buf, final CompletionHandler handler) {
+ handler.completed();
+ }
+
+ @Override
+ public void close(final CompletionHandler handler) {
+ responseWritten.await();
+ handler.failed(new ConformanceException());
+ throw new ConformanceException();
+ }
+ };
+ }
+ });
+ }
+
+ public abstract void testRequestContentCloseExceptionAfterResponseCloseNoContentWithSyncFailure() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestContentCloseExceptionAfterResponseCloseNoContentWithSyncFailure(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITHOUT_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
+ callInOtherThread(() -> {
+ closeResponse(out);
+ return null;
+ });
+
+ return new ContentChannel() {
+
+ @Override
+ public void write(final ByteBuffer buf, final CompletionHandler handler) {
+ handler.completed();
+ }
+
+ @Override
+ public void close(final CompletionHandler handler) {
+ responseClosed.await();
+ handler.failed(new ConformanceException());
+ throw new ConformanceException();
+ }
+ };
+ }
+ });
+ }
+
+ public abstract void testRequestContentCloseNondeterministicExceptionWithAsyncFailure() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestContentCloseNondeterministicExceptionWithAsyncFailure(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITHOUT_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
+ callInOtherThread(() -> {
+ writeResponse(out);
+ closeResponse(out);
+ return null;
+ });
+
+ return new ContentChannel() {
+
+ @Override
+ public void write(final ByteBuffer buf, final CompletionHandler handler) {
+ handler.completed();
+ }
+
+ @Override
+ public void close(final CompletionHandler handler) {
+ callInOtherThread(() -> {
+ fail(handler, new ConformanceException(), IllegalStateException.class);
+ return null;
+ });
+ throw new ConformanceException();
+ }
+ };
+ }
+ });
+ }
+
+ public abstract void testRequestContentCloseExceptionBeforeResponseWriteWithAsyncFailure() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestContentCloseExceptionBeforeResponseWriteWithAsyncFailure(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITHOUT_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
+ final Event exceptionHandled = new Event();
+
+ callInOtherThread(() -> {
+ exceptionHandled.await();
+ writeResponse(out);
+ closeResponse(out);
+ return null;
+ });
+
+ return new ContentChannel() {
+
+ @Override
+ public void write(final ByteBuffer buf, final CompletionHandler handler) {
+ handler.completed();
+ }
+
+ @Override
+ public void close(final CompletionHandler handler) {
+
+ callInOtherThread(() -> {
+ fail(handler, new ConformanceException(exceptionHandled), IllegalStateException.class);
+ return null;
+ });
+
+ throw new ConformanceException(exceptionHandled);
+ }
+ };
+ }
+ });
+ }
+
+ public abstract void testRequestContentCloseExceptionAfterResponseWriteWithAsyncFailure() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestContentCloseExceptionAfterResponseWriteWithAsyncFailure(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITHOUT_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
+ callInOtherThread(() -> {
+ writeResponse(out);
+ closeResponse(out);
+ return null;
+ });
+
+ return new ContentChannel() {
+
+ @Override
+ public void write(final ByteBuffer buf, final CompletionHandler handler) {
+ handler.completed();
+ }
+
+ @Override
+ public void close(final CompletionHandler handler) {
+ callInOtherThread(() -> {
+ responseWritten.await();
+ fail(handler, new ConformanceException(), IllegalStateException.class);
+ return null;
+ });
+ responseWritten.await();
+ throw new ConformanceException();
+ }
+ };
+ }
+ });
+ }
+
+ public abstract void testRequestContentCloseExceptionAfterResponseCloseNoContentWithAsyncFailure() throws Throwable;
+ private <T extends ServerProvider, U, V> void testRequestContentCloseExceptionAfterResponseCloseNoContentWithAsyncFailure(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITHOUT_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(final Request request, final ResponseHandler handler) {
+ final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
+ callInOtherThread(() -> {
+ closeResponse(out);
+ return null;
+ });
+
+ return new ContentChannel() {
+
+ @Override
+ public void write(final ByteBuffer buf, final CompletionHandler handler) {
+ handler.completed();
+ }
+
+ @Override
+ public void close(final CompletionHandler handler) {
+ callInOtherThread(() -> {
+ responseClosed.await();
+ fail(handler, new ConformanceException(), IllegalStateException.class);
+ return null;
+ });
+ responseClosed.await();
+ throw new ConformanceException();
+ }
+ };
+ }
+ });
+ }
+
+ public abstract void testResponseWriteCompletionException() throws Throwable;
+ private <T extends ServerProvider, U, V> void testResponseWriteCompletionException(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITHOUT_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(Request request, ResponseHandler handler) {
+ final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
+ for (ByteBuffer buf : adapter.newResponseContent()) {
+ out.write(buf, EXCEPTION_COMPLETION_HANDLER);
+ }
+ closeResponse(out);
+ return null;
+ }
+ });
+ }
+
+ public abstract void testResponseCloseCompletionException() throws Throwable;
+ private <T extends ServerProvider, U, V> void testResponseCloseCompletionException(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITHOUT_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(Request request, ResponseHandler handler) {
+ ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
+ writeResponse(out);
+ out.close(EXCEPTION_COMPLETION_HANDLER);
+ return null;
+ }
+ });
+ }
+
+ public abstract void testResponseCloseCompletionExceptionNoContent() throws Throwable;
+ private <T extends ServerProvider, U, V> void testResponseCloseCompletionExceptionNoContent(
+ final Adapter<T, U, V> adapter,
+ final Module... config)
+ throws Throwable {
+ runTest(adapter,
+ Modules.combine(config),
+ RequestType.WITHOUT_CONTENT,
+ new TestRequestHandler() {
+
+ @Override
+ public ContentChannel handle(Request request, ResponseHandler handler) {
+ ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
+ out.close(EXCEPTION_COMPLETION_HANDLER);
+ return null;
+ }
+ });
+ }
+
+ // -------------------------------------------------------------------------------------------------------------- //
+ // //
+ // The following section is implementation details that are not necessary to understand in order to implement a //
+ // conformance test for a ServerProvider. //
+ // //
+ // -------------------------------------------------------------------------------------------------------------- //
+
+ protected <T extends ServerProvider, U, V> void runTest(
+ final Adapter<T, U, V> adapter,
+ final Module... guiceModules)
+ throws Throwable {
+ Class<ServerProviderConformanceTest> clazz = ServerProviderConformanceTest.class;
+ StackTraceElement[] stack = Thread.currentThread().getStackTrace();
+ for (StackTraceElement element : stack) {
+ Method method;
+ final String methodName = element.getMethodName();
+ try {
+ method = clazz.getDeclaredMethod(methodName);
+ } catch (NoSuchMethodException e) {
+ continue;
+ }
+ if (!Modifier.isAbstract(method.getModifiers())) {
+ continue;
+ }
+ try {
+ method = clazz.getDeclaredMethod(methodName, Adapter.class, Module[].class);
+ System.out.println("Invoking test method " + methodName);
+ method.invoke(this, adapter, guiceModules);
+ return;
+ } catch (InvocationTargetException e) {
+ throw e.getCause();
+ }
+ }
+ throw new UnsupportedOperationException("Method runTest() not called from overridden testXXX() method.");
+ }
+
+ // The only purpose of this is to avoid magic literals in calls to runTest (which we'd have if we used a bool flag).
+ private enum RequestType {
+ WITHOUT_CONTENT, WITH_CONTENT
+ }
+
+ private <T extends ServerProvider, U, V> void runTest(
+ final Adapter<T, U, V> adapter,
+ final Module testConfig,
+ final RequestType requestType,
+ final TestRequestHandler requestHandler)
+ throws Throwable {
+ final Module config = Modules.override(newDefaultConfig(), adapter.newConfigModule()).with(testConfig);
+ final TestDriver driver = TestDriver.newSimpleApplicationInstance(config);
+ final ContainerBuilder builder = driver.newContainerBuilder();
+ builder.serverBindings().bind(builder.getInstance(Key.get(String.class, Names.named("serverBinding"))),
+ requestHandler);
+ final T serverProvider = builder.guiceModules().getInstance(adapter.getServerProviderClass());
+ builder.serverProviders().install(serverProvider);
+ if (builder.getInstance(Key.get(Boolean.class, Names.named("activateContainer")))) {
+ driver.activateContainer(builder);
+ }
+ serverProvider.start();
+ serverProvider.release();
+
+ for (int i = 0; i < NUM_RUNS_EACH_TEST; ++i) {
+ System.out.println("Test run #" + i);
+ requestHandler.reset(adapter.newResponseContent());
+ final U client = adapter.newClient(serverProvider);
+ final boolean withRequestContent = requestType == RequestType.WITH_CONTENT;
+ final V result = adapter.executeRequest(client, withRequestContent);
+ adapter.validateResponse(result);
+ if (client instanceof Closeable) {
+ ((Closeable) client).close();
+ }
+ requestHandler.awaitAsyncTasks();
+ }
+
+ serverProvider.close();
+ driver.close();
+ }
+
+ private static Module newDefaultConfig() {
+ return Modules.combine(newServerBinding("*://*/*"),
+ newActivateContainer(true));
+ }
+
+ private static Module newBindingSetSelector(final String bindingSetName) {
+ return new AbstractModule() {
+
+ @Override
+ protected void configure() {
+ bind(BindingSetSelector.class).toInstance(uri -> bindingSetName);
+ }
+ };
+ }
+
+ private static Module newServerBinding(final String serverBinding) {
+ return new AbstractModule() {
+
+ @Override
+ protected void configure() {
+ bind(String.class).annotatedWith(Names.named("serverBinding")).toInstance(serverBinding);
+ }
+ };
+ }
+
+ private static Module newActivateContainer(final boolean activateContainer) {
+ return new AbstractModule() {
+
+ @Override
+ protected void configure() {
+ bind(Boolean.class).annotatedWith(Names.named("activateContainer")).toInstance(activateContainer);
+ }
+ };
+ }
+
+ /**
+ * Wrapper around CountDownLatch for single-occurrence events.
+ */
+ private static class Event {
+ private final CountDownLatch latch = new CountDownLatch(1);
+
+ public void happened() {
+ latch.countDown();
+ }
+
+ public void await() {
+ try {
+ final boolean success = latch.await(600, TimeUnit.SECONDS);
+ if (!success) {
+ throw new IllegalStateException("Wait for required condition timed out");
+ }
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("Wait for required condition was interrupted", e);
+ }
+ }
+ }
+
+ private static abstract class TestRequestHandler extends AbstractRequestHandler {
+
+ private static class TaskHandle {
+ private final Exception stackTrace = new Exception();
+
+ @Override
+ public String toString() {
+ final StringWriter stringWriter = new StringWriter();
+ stackTrace.printStackTrace(new PrintWriter(stringWriter));
+ return "(" + stringWriter.toString() + ")";
+ }
+ }
+
+ protected Event responseWritten;
+ protected Event responseClosed;
+ private ExecutorService executor;
+ private final Object taskMonitor = new Object();
+ private Set<TaskHandle> pendingTasks = new HashSet<>();
+ private Exception taskException;
+ private Iterable<ByteBuffer> responseContent;
+
+ public void reset(final Iterable<ByteBuffer> responseContent) {
+ synchronized (taskMonitor) {
+ if (!pendingTasks.isEmpty()) {
+ throw new AssertionError("pendingTasks should be empty, was " + pendingTasks);
+ }
+ }
+ this.executor = Executors.newCachedThreadPool();
+ this.responseWritten = new Event();
+ this.responseClosed = new Event();
+ this.responseContent = responseContent;
+ this.taskException = null;
+ }
+
+ protected final void callInOtherThread(final Callable<Void> task) {
+ final TaskHandle taskHandle = addTask();
+ final Runnable runnable = () -> {
+ try {
+ task.call();
+ } catch (Exception e) {
+ setTaskFailure(e);
+ }
+ removeTask(taskHandle);
+ };
+ try {
+ executor.submit(runnable);
+ } catch (RejectedExecutionException e) {
+ setTaskFailure(e);
+ removeTask(taskHandle);
+ }
+ }
+
+ private void setTaskFailure(Exception e) {
+ synchronized (taskMonitor) {
+ if (taskException == null) {
+ taskException = e;
+ } else {
+ System.out.println("Got subsequent exception in task execution: ");
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private void removeTask(final TaskHandle taskHandle) {
+ synchronized (taskMonitor) {
+ pendingTasks.remove(taskHandle);
+ taskMonitor.notifyAll();
+ }
+ }
+
+ @CheckReturnValue
+ private TaskHandle addTask() {
+ final TaskHandle taskHandle = new TaskHandle();
+ synchronized (taskMonitor) {
+ pendingTasks.add(taskHandle);
+ }
+ return taskHandle;
+ }
+
+ protected final void writeResponse(final ContentChannel out) {
+ try {
+ writeAll(out, responseContent);
+ } finally {
+ responseWritten.happened();
+ }
+ }
+
+ private void writeAll(final ContentChannel out, final Iterable<ByteBuffer> content) {
+ for (ByteBuffer buf : content) {
+ out.write(buf, newCompletionHandler());
+ }
+ }
+
+ protected final void closeResponseInOtherThread(final ContentChannel out) {
+ callInOtherThread(() -> {
+ closeResponse(out);
+ return null;
+ });
+ }
+
+ protected final void closeResponse(final ContentChannel out) {
+ try {
+ out.close(newCompletionHandler());
+ } finally {
+ responseClosed.happened();
+ }
+ }
+
+ protected final CompletionHandler newCompletionHandler() {
+ final CallableCompletionHandler handler = new CallableCompletionHandler();
+ callInOtherThread(handler);
+ return handler;
+ }
+
+ protected final void respondWithContentInOtherThread(final ResponseHandler handler) {
+ callInOtherThread(() -> {
+ respondWithContent(handler);
+ return null;
+ });
+ }
+
+ protected final void respondNoContentInOtherThread(final ResponseHandler handler) {
+ callInOtherThread(() -> {
+ respondNoContent(handler);
+ return null;
+ });
+ }
+
+ protected void respondWithContent(final ResponseHandler handler) {
+ final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
+ writeResponse(out);
+ closeResponse(out);
+ }
+
+ protected void respondNoContent(final ResponseHandler handler) {
+ final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
+ closeResponse(out);
+ }
+
+ protected final void completeInOtherThread(
+ final CompletionHandler handler,
+ final Class<?>... allowedExceptionTypes) {
+ callInOtherThread(() -> {
+ try {
+ handler.completed();
+ } catch (Throwable t) {
+ if (!isInstanceOfAnyOf(t, allowedExceptionTypes)) {
+ throw t;
+ }
+ }
+ return null;
+ });
+ }
+
+ protected final void fail(
+ final CompletionHandler handler,
+ final Throwable failure,
+ final Class<?>... allowedExceptionTypes) {
+ try {
+ handler.failed(failure);
+ } catch (Throwable t) {
+ if (!isInstanceOfAnyOf(t, allowedExceptionTypes)) {
+ throw t;
+ }
+ }
+ }
+
+ private static boolean isInstanceOfAnyOf(final Object object, final Class<?>... types) {
+ return Stream.of(types).anyMatch(type -> type.isAssignableFrom(object.getClass()));
+ }
+
+ protected final void failInOtherThread(
+ final CompletionHandler handler,
+ final Throwable failure,
+ final Class<?>... allowedExceptionTypes) {
+ callInOtherThread(() -> {
+ fail(handler, failure, allowedExceptionTypes);
+ return null;
+ });
+ }
+
+ @Override
+ public final ContentChannel handleRequest(final Request request, final ResponseHandler responseHandler) {
+ // Ensure that task executor is not shut down before handleResponse() is done.
+ final TaskHandle handleResponseTask = addTask();
+ try {
+ final ContentChannel requestContentChannel = handle(request, responseHandler);
+ if (requestContentChannel == null) {
+ return null;
+ }
+ // Ensure that task executor is not shut down before close() is done.
+ final TaskHandle requestContentChannelCloseTask = addTask();
+ return new ContentChannel() {
+ @Override
+ public void write(ByteBuffer buf, CompletionHandler handler) {
+ requestContentChannel.write(buf, handler);
+ }
+
+ @Override
+ public void close(CompletionHandler handler) {
+ try {
+ requestContentChannel.close(handler);
+ } finally {
+ removeTask(requestContentChannelCloseTask);
+ }
+ }
+ };
+ } finally {
+ removeTask(handleResponseTask);
+ }
+ }
+
+ protected abstract ContentChannel handle(Request request, ResponseHandler responseHandler);
+
+ public final void awaitAsyncTasks() throws Exception {
+ final long maxWaitTimeMillis = 600_000L;
+ final long startTimeMillis = System.currentTimeMillis();
+ synchronized (taskMonitor) {
+ while (!pendingTasks.isEmpty()) {
+ final long currentTimeMillis = System.currentTimeMillis();
+ final long timeElapsedMillis = currentTimeMillis - startTimeMillis;
+ if (timeElapsedMillis >= maxWaitTimeMillis) {
+ throw new IllegalStateException(
+ "Wait timed out, still have the following pending tasks: " + pendingTasks);
+ }
+ final long waitTimeMillis = maxWaitTimeMillis - timeElapsedMillis;
+ taskMonitor.wait(waitTimeMillis);
+ }
+ }
+ executor.shutdown();
+ final boolean haltedCleanly = executor.awaitTermination(600, TimeUnit.SECONDS);
+ if (!haltedCleanly) {
+ throw new IllegalStateException("Some tasks did not finish. executor=" + executor);
+ }
+ synchronized (taskMonitor) {
+ if (taskException != null) {
+ throw new Exception("Task threw exception", taskException);
+ }
+ }
+ }
+ }
+
+ private static class CallableCompletionHandler implements Callable<Void>, CompletionHandler {
+
+ final CountDownLatch done = new CountDownLatch(1);
+
+ @Override
+ public void completed() {
+ done.countDown();
+ }
+
+ @Override
+ public void failed(final Throwable t) {
+ done.countDown();
+ }
+
+ @Override
+ public Void call() throws Exception {
+ if (!done.await(600, TimeUnit.SECONDS)) {
+ throw new TimeoutException();
+ }
+ return null;
+ }
+ }
+
+ private static final CompletionHandler EXCEPTION_COMPLETION_HANDLER = new CompletionHandler() {
+
+ @Override
+ public void completed() {
+ throw new ConformanceException();
+ }
+
+ @Override
+ public void failed(Throwable t) {
+ throw new ConformanceException();
+ }
+ };
+}