// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.jdisc.test;
import com.yahoo.api.annotations.Beta;
import com.google.inject.AbstractModule;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.name.Names;
import com.google.inject.util.Modules;
import com.yahoo.jdisc.Request;
import com.yahoo.jdisc.Response;
import com.yahoo.jdisc.application.BindingSetSelector;
import com.yahoo.jdisc.application.ContainerBuilder;
import com.yahoo.jdisc.handler.AbstractRequestHandler;
import com.yahoo.jdisc.handler.CompletionHandler;
import com.yahoo.jdisc.handler.ContentChannel;
import com.yahoo.jdisc.handler.ResponseHandler;
import com.yahoo.jdisc.service.ServerProvider;
import java.io.Closeable;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Logger;
import java.util.stream.Stream;
/**
* @author Simon Thoresen Hult
*/
@SuppressWarnings("UnusedDeclaration")
@Beta
public abstract class ServerProviderConformanceTest {
private static final Logger log = Logger.getLogger(ServerProviderConformanceTest.class.getName());
private static final int NUM_RUNS_EACH_TEST = 1;
/**
*
This interface declares the adapter between the general conformance test and an actual ServerProvider
* implementation. Every test runs as follows:
*
* - {@link #newConfigModule()} is called to bind server-specific configuration.
* - {@link #getServerProviderClass()} is called, and guice is asked to construct an instance of that class.
* - {@link #newClient(ServerProvider)} is called one or more times as required by the test case.
* - {@link #executeRequest(Object, boolean)} is called one or more times per client, as required by the test case.
* - {@link #validateResponse(Object)} is called once per call to {@link #executeRequest(Object, boolean)}.
*
*
* @param The ServerProvider
under test.
* @param An object that represents a remote client that can connect to the server.
* @param An object that holds the response generated by the client when executing a request.
*/
public interface Adapter {
Module newConfigModule();
Class getServerProviderClass();
U newClient(T server) throws Throwable;
V executeRequest(U client, boolean withRequestContent) throws Throwable;
Iterable newResponseContent();
void validateResponse(V response) throws Throwable;
}
/**
* An instance of this exception is thrown within the conformance tests that imply that they will throw an
* exception. If your ServerProvider
is capable of exposing such information, then this class is what you
* need to look for in the output.
*/
public static class ConformanceException extends RuntimeException {
private final Event peekEvent;
public ConformanceException() {
peekEvent = null;
}
/**
* In some tests, we want to ensure that a thrown exception has been handled by the framework before
* we do something else. There is no official hook to receive notification that the framework has
* handled an exception, but we assume (actually know) that the message of the exception will be
* accessed to create an error message. The provided event will signal that the exception
* has been _looked at_ by the framework, which we treat as synonymous with "handled" (due to
* synchronization in the framework, it is).
*/
public ConformanceException(final Event peekEvent) {
this.peekEvent = peekEvent;
}
@Override
public String getMessage() {
if (peekEvent != null) {
peekEvent.happened();
}
return super.getMessage();
}
}
/* The following section declares and implements all test cases for the ServerProvider conformance test. When
* subclassing this test, you must implement these methods, annotate them as test methods and call runTest()
* from within each of them with an appropriate adapter instance.
*
* The test set up various scenarios with successes, failures and exceptions in different places and with
* different timing. There are many dimensions to test across, hence some really long method names. Some
* notes about the naming "scheme":
* - "testRequest" means the funky stuff happens in the handleRequest() method.
* - "testRequestContent" indicates that the funky stuff happens in the request content channel's code.
* - "testResponse" indicates that the funky stuff happens with the response content channel.
* - "Failure" means that failed() is called on some completion handler (the method name should indicate which).
* - "Nondeterministic" exception/failure means that it can occur before, during or after writing response content.
* The reason we include non-deterministic tests is that the deterministic ones involve synchronization, which
* may hide race conditions in the underlying processing. So we want some tests that "run free" as well.
* - "WithSync" means that anything NOT mentioned happens asynchronously, in a different thread.
* - "Before"/"After" is significant in some protocols; e.g. in http, status and headers are committed at one point.
* - "NoContent" refers to response content.
* There are quite likely possible scenarios that are not tested, but this is a good portion.
*/
public abstract void testContainerNotReadyException() throws Throwable;
private void testContainerNotReadyException(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.override(Modules.combine(config)).with(newActivateContainer(false)),
RequestType.WITHOUT_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(Request request, ResponseHandler handler) {
throw new AssertionError();
}
});
}
public abstract void testBindingSetNotFoundException() throws Throwable;
private void testBindingSetNotFoundException(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.override(Modules.combine()).with(newBindingSetSelector("unknown")),
RequestType.WITHOUT_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(Request request, ResponseHandler handler) {
throw new AssertionError();
}
});
}
public abstract void testNoBindingSetSelectedException() throws Throwable;
private void testNoBindingSetSelectedException(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.override(Modules.combine()).with(newBindingSetSelector(null)),
RequestType.WITHOUT_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(Request request, ResponseHandler handler) {
throw new AssertionError();
}
});
}
public abstract void testBindingNotFoundException() throws Throwable;
private void testBindingNotFoundException(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.override(Modules.combine(config)).with(newServerBinding("not://found/")),
RequestType.WITHOUT_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(Request request, ResponseHandler handler) {
throw new AssertionError();
}
});
}
public abstract void testRequestHandlerWithSyncCloseResponse() throws Throwable;
private void testRequestHandlerWithSyncCloseResponse(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITHOUT_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
writeResponse(out);
closeResponse(out);
return null;
}
});
}
public abstract void testRequestHandlerWithSyncWriteResponse() throws Throwable;
private void testRequestHandlerWithSyncWriteResponse(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITHOUT_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
writeResponse(out);
closeResponseInOtherThread(out);
return null;
}
});
}
public abstract void testRequestHandlerWithSyncHandleResponse() throws Throwable;
private void testRequestHandlerWithSyncHandleResponse(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITHOUT_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
callInOtherThread(() -> {
writeResponse(out);
closeResponse(out);
return null;
});
return null;
}
});
}
public abstract void testRequestHandlerWithAsyncHandleResponse() throws Throwable;
private void testRequestHandlerWithAsyncHandleResponse(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITHOUT_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
respondWithContentInOtherThread(handler);
return null;
}
});
}
public abstract void testRequestException() throws Throwable;
private void testRequestException(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITHOUT_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(Request request, ResponseHandler handler) {
throw new ConformanceException();
}
});
}
public abstract void testRequestExceptionWithSyncCloseResponse() throws Throwable;
private void testRequestExceptionWithSyncCloseResponse(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITHOUT_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
writeResponse(out);
closeResponse(out);
throw new ConformanceException();
}
});
}
public abstract void testRequestExceptionWithSyncWriteResponse() throws Throwable;
private void testRequestExceptionWithSyncWriteResponse(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITHOUT_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
writeResponse(out);
closeResponseInOtherThread(out);
throw new ConformanceException();
}
});
}
public abstract void testRequestNondeterministicExceptionWithSyncHandleResponse() throws Throwable;
private void testRequestNondeterministicExceptionWithSyncHandleResponse(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITHOUT_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
callInOtherThread(() -> {
writeResponse(out);
closeResponse(out);
return null;
});
throw new ConformanceException();
}
});
}
public abstract void testRequestExceptionBeforeResponseWriteWithSyncHandleResponse() throws Throwable;
private void testRequestExceptionBeforeResponseWriteWithSyncHandleResponse(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITHOUT_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
final Event exceptionHandledByFramework = new Event();
callInOtherThread(() -> {
exceptionHandledByFramework.await();
writeResponse(out);
closeResponse(out);
return null;
});
throw new ConformanceException(exceptionHandledByFramework);
}
});
}
public abstract void testRequestExceptionAfterResponseWriteWithSyncHandleResponse() throws Throwable;
private void testRequestExceptionAfterResponseWriteWithSyncHandleResponse(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITHOUT_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
callInOtherThread(() -> {
writeResponse(out);
closeResponse(out);
return null;
});
responseWritten.await();
throw new ConformanceException();
}
});
}
public abstract void testRequestNondeterministicExceptionWithAsyncHandleResponse() throws Throwable;
private void testRequestNondeterministicExceptionWithAsyncHandleResponse(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITHOUT_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
callInOtherThread(new Callable() {
@Override
public Void call() throws Exception {
try {
final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
closeResponse(out);
} catch (Throwable ignored) {
}
return null;
}
});
throw new ConformanceException();
}
});
}
public abstract void testRequestExceptionBeforeResponseWriteWithAsyncHandleResponse() throws Throwable;
private void testRequestExceptionBeforeResponseWriteWithAsyncHandleResponse(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITHOUT_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
final Event exceptionHandledByFramework = new Event();
callInOtherThread(new Callable() {
@Override
public Void call() throws Exception {
exceptionHandledByFramework.await();
try {
final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
exceptionHandledByFramework.await();
writeResponse(out);
closeResponse(out);
} catch (Throwable ignored) {
}
return null;
}
});
throw new ConformanceException(exceptionHandledByFramework);
}
});
}
public abstract void testRequestExceptionAfterResponseCloseNoContentWithAsyncHandleResponse() throws Throwable;
private void testRequestExceptionAfterResponseCloseNoContentWithAsyncHandleResponse(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITHOUT_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
callInOtherThread(() -> {
try {
respondNoContent(handler);
} catch (Throwable ignored) {
}
return null;
});
responseClosed.await();
throw new ConformanceException();
}
});
}
public abstract void testRequestExceptionAfterResponseWriteWithAsyncHandleResponse() throws Throwable;
private void testRequestExceptionAfterResponseWriteWithAsyncHandleResponse(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITHOUT_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
callInOtherThread(new Callable() {
@Override
public Void call() throws Exception {
final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
writeResponse(out);
closeResponse(out);
return null;
}
});
responseWritten.await();
throw new ConformanceException();
}
});
}
public abstract void testRequestContentWriteWithSyncCompletion() throws Throwable;
private void testRequestContentWriteWithSyncCompletion(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITH_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
respondWithContentInOtherThread(handler);
return new ContentChannel() {
@Override
public void write(final ByteBuffer buf, final CompletionHandler handler) {
handler.completed();
}
@Override
public void close(final CompletionHandler handler) {
handler.completed();
}
};
}
});
}
public abstract void testRequestContentWriteWithAsyncCompletion() throws Throwable;
private void testRequestContentWriteWithAsyncCompletion(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITH_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
respondWithContentInOtherThread(handler);
return new ContentChannel() {
@Override
public void write(final ByteBuffer buf, final CompletionHandler handler) {
completeInOtherThread(handler);
}
@Override
public void close(final CompletionHandler handler) {
handler.completed();
}
};
}
});
}
public abstract void testRequestContentWriteWithNondeterministicSyncFailure() throws Throwable;
private void testRequestContentWriteWithNondeterministicSyncFailure(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITH_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
callInOtherThread(() -> {
writeResponse(out);
closeResponse(out);
return null;
});
return new ContentChannel() {
@Override
public void write(final ByteBuffer buf, final CompletionHandler handler) {
handler.failed(new ConformanceException());
}
@Override
public void close(final CompletionHandler handler) {
handler.completed();
}
};
}
});
}
public abstract void testRequestContentWriteWithSyncFailureBeforeResponseWrite() throws Throwable;
private void testRequestContentWriteWithSyncFailureBeforeResponseWrite(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITH_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
final Event failDone = new Event();
callInOtherThread(() -> {
failDone.await();
writeResponse(out);
closeResponse(out);
return null;
});
return new ContentChannel() {
@Override
public void write(final ByteBuffer buf, final CompletionHandler handler) {
try {
handler.failed(new ConformanceException());
} finally {
failDone.happened();
}
}
@Override
public void close(final CompletionHandler handler) {
handler.completed();
}
};
}
});
}
public abstract void testRequestContentWriteWithSyncFailureAfterResponseWrite() throws Throwable;
private void testRequestContentWriteWithSyncFailureAfterResponseWrite(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITH_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
callInOtherThread(() -> {
writeResponse(out);
closeResponse(out);
return null;
});
return new ContentChannel() {
@Override
public void write(final ByteBuffer buf, final CompletionHandler handler) {
responseWritten.await();
handler.failed(new ConformanceException());
}
@Override
public void close(final CompletionHandler handler) {
handler.completed();
}
};
}
});
}
public abstract void testRequestContentWriteWithNondeterministicAsyncFailure() throws Throwable;
private void testRequestContentWriteWithNondeterministicAsyncFailure(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITH_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
callInOtherThread(() -> {
writeResponse(out);
closeResponse(out);
return null;
});
return new ContentChannel() {
@Override
public void write(final ByteBuffer buf, final CompletionHandler handler) {
callInOtherThread(() -> {
handler.failed(new ConformanceException());
return null;
});
}
@Override
public void close(final CompletionHandler handler) {
handler.completed();
}
};
}
});
}
public abstract void testRequestContentWriteWithAsyncFailureBeforeResponseWrite() throws Throwable;
private void testRequestContentWriteWithAsyncFailureBeforeResponseWrite(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITH_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
final Event failDone = new Event();
callInOtherThread(() -> {
failDone.await();
writeResponse(out);
closeResponse(out);
return null;
});
return new ContentChannel() {
@Override
public void write(final ByteBuffer buf, final CompletionHandler handler) {
callInOtherThread(() -> {
try {
handler.failed(new ConformanceException());
} finally {
failDone.happened();
}
return null;
});
}
@Override
public void close(final CompletionHandler handler) {
handler.completed();
}
};
}
});
}
public abstract void testRequestContentWriteWithAsyncFailureAfterResponseWrite() throws Throwable;
private void testRequestContentWriteWithAsyncFailureAfterResponseWrite(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITH_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
callInOtherThread(() -> {
writeResponse(out);
closeResponse(out);
return null;
});
return new ContentChannel() {
@Override
public void write(final ByteBuffer buf, final CompletionHandler handler) {
callInOtherThread(() -> {
responseWritten.await();
handler.failed(new ConformanceException());
return null;
});
}
@Override
public void close(final CompletionHandler handler) {
handler.completed();
}
};
}
});
}
public abstract void testRequestContentWriteWithAsyncFailureAfterResponseCloseNoContent() throws Throwable;
private void testRequestContentWriteWithAsyncFailureAfterResponseCloseNoContent(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITH_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
callInOtherThread(() -> {
closeResponse(out);
return null;
});
return new ContentChannel() {
@Override
public void write(final ByteBuffer buf, final CompletionHandler handler) {
callInOtherThread(() -> {
responseClosed.await();
handler.failed(new ConformanceException());
return null;
});
}
@Override
public void close(final CompletionHandler handler) {
handler.completed();
}
};
}
});
}
public abstract void testRequestContentWriteNondeterministicException() throws Throwable;
private void testRequestContentWriteNondeterministicException(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITH_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
callInOtherThread(() -> {
writeResponse(out);
closeResponse(out);
return null;
});
return new ContentChannel() {
@Override
public void write(final ByteBuffer buf, final CompletionHandler handler) {
throw new ConformanceException();
}
@Override
public void close(final CompletionHandler handler) {
handler.completed();
}
};
}
});
}
public abstract void testRequestContentWriteExceptionBeforeResponseWrite() throws Throwable;
private void testRequestContentWriteExceptionBeforeResponseWrite(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITH_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
final Event exceptionHandledByFramework = new Event();
callInOtherThread(() -> {
exceptionHandledByFramework.await();
writeResponse(out);
closeResponse(out);
return null;
});
return new ContentChannel() {
@Override
public void write(final ByteBuffer buf, final CompletionHandler handler) {
throw new ConformanceException(exceptionHandledByFramework);
}
@Override
public void close(final CompletionHandler handler) {
handler.completed();
}
};
}
});
}
public abstract void testRequestContentWriteExceptionAfterResponseWrite() throws Throwable;
private void testRequestContentWriteExceptionAfterResponseWrite(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITH_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
callInOtherThread(() -> {
writeResponse(out);
closeResponse(out);
return null;
});
return new ContentChannel() {
@Override
public void write(final ByteBuffer buf, final CompletionHandler handler) {
responseWritten.await();
throw new ConformanceException();
}
@Override
public void close(final CompletionHandler handler) {
handler.completed();
}
};
}
});
}
public abstract void testRequestContentWriteExceptionAfterResponseCloseNoContent() throws Throwable;
private void testRequestContentWriteExceptionAfterResponseCloseNoContent(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITH_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
callInOtherThread(() -> {
closeResponse(out);
return null;
});
return new ContentChannel() {
@Override
public void write(final ByteBuffer buf, final CompletionHandler handler) {
responseClosed.await();
throw new ConformanceException();
}
@Override
public void close(final CompletionHandler handler) {
handler.completed();
}
};
}
});
}
public abstract void testRequestContentWriteNondeterministicExceptionWithSyncCompletion() throws Throwable;
private void testRequestContentWriteNondeterministicExceptionWithSyncCompletion(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITH_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
callInOtherThread(() -> {
writeResponse(out);
closeResponse(out);
return null;
});
return new ContentChannel() {
@Override
public void write(final ByteBuffer buf, final CompletionHandler handler) {
handler.completed();
throw new ConformanceException();
}
@Override
public void close(final CompletionHandler handler) {
handler.completed();
}
};
}
});
}
public abstract void testRequestContentWriteExceptionBeforeResponseWriteWithSyncCompletion() throws Throwable;
private void testRequestContentWriteExceptionBeforeResponseWriteWithSyncCompletion(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITH_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
final Event exceptionHandledByFramework = new Event();
callInOtherThread(() -> {
exceptionHandledByFramework.await();
writeResponse(out);
closeResponse(out);
return null;
});
return new ContentChannel() {
@Override
public void write(final ByteBuffer buf, final CompletionHandler handler) {
handler.completed();
throw new ConformanceException(exceptionHandledByFramework);
}
@Override
public void close(final CompletionHandler handler) {
handler.completed();
}
};
}
});
}
public abstract void testRequestContentWriteExceptionAfterResponseWriteWithSyncCompletion() throws Throwable;
private void testRequestContentWriteExceptionAfterResponseWriteWithSyncCompletion(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITH_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
callInOtherThread(() -> {
writeResponse(out);
closeResponse(out);
return null;
});
return new ContentChannel() {
@Override
public void write(final ByteBuffer buf, final CompletionHandler handler) {
handler.completed();
responseWritten.await();
throw new ConformanceException();
}
@Override
public void close(final CompletionHandler handler) {
handler.completed();
}
};
}
});
}
public abstract void testRequestContentWriteExceptionAfterResponseCloseNoContentWithSyncCompletion() throws Throwable;
private void testRequestContentWriteExceptionAfterResponseCloseNoContentWithSyncCompletion(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITH_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
callInOtherThread(() -> {
closeResponse(out);
return null;
});
return new ContentChannel() {
@Override
public void write(final ByteBuffer buf, final CompletionHandler handler) {
handler.completed();
responseClosed.await();
throw new ConformanceException();
}
@Override
public void close(final CompletionHandler handler) {
handler.completed();
}
};
}
});
}
public abstract void testRequestContentWriteNondeterministicExceptionWithAsyncCompletion() throws Throwable;
private void testRequestContentWriteNondeterministicExceptionWithAsyncCompletion(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITH_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
callInOtherThread(() -> {
writeResponse(out);
closeResponse(out);
return null;
});
return new ContentChannel() {
@Override
public void write(final ByteBuffer buf, final CompletionHandler handler) {
completeInOtherThread(handler, IllegalStateException.class);
throw new ConformanceException();
}
@Override
public void close(final CompletionHandler handler) {
handler.completed();
}
};
}
});
}
public abstract void testRequestContentWriteExceptionBeforeResponseWriteWithAsyncCompletion() throws Throwable;
private void testRequestContentWriteExceptionBeforeResponseWriteWithAsyncCompletion(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITH_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
final Event exceptionHandledByFramework = new Event();
callInOtherThread(() -> {
exceptionHandledByFramework.await();
writeResponse(out);
closeResponse(out);
return null;
});
return new ContentChannel() {
@Override
public void write(final ByteBuffer buf, final CompletionHandler handler) {
completeInOtherThread(handler, IllegalStateException.class);
throw new ConformanceException(exceptionHandledByFramework);
}
@Override
public void close(final CompletionHandler handler) {
handler.completed();
}
};
}
});
}
public abstract void testRequestContentWriteExceptionAfterResponseWriteWithAsyncCompletion() throws Throwable;
private void testRequestContentWriteExceptionAfterResponseWriteWithAsyncCompletion(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITH_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
callInOtherThread(() -> {
writeResponse(out);
closeResponse(out);
return null;
});
return new ContentChannel() {
@Override
public void write(final ByteBuffer buf, final CompletionHandler handler) {
completeInOtherThread(handler, IllegalStateException.class);
responseWritten.await();
throw new ConformanceException();
}
@Override
public void close(final CompletionHandler handler) {
handler.completed();
}
};
}
});
}
public abstract void testRequestContentWriteExceptionAfterResponseCloseNoContentWithAsyncCompletion() throws Throwable;
private void testRequestContentWriteExceptionAfterResponseCloseNoContentWithAsyncCompletion(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITH_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
callInOtherThread(() -> {
closeResponse(out);
return null;
});
return new ContentChannel() {
@Override
public void write(final ByteBuffer buf, final CompletionHandler handler) {
completeInOtherThread(handler, IllegalStateException.class);
responseClosed.await();
throw new ConformanceException();
}
@Override
public void close(final CompletionHandler handler) {
handler.completed();
}
};
}
});
}
public abstract void testRequestContentWriteExceptionWithNondeterministicSyncFailure() throws Throwable;
private void testRequestContentWriteExceptionWithNondeterministicSyncFailure(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITH_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
callInOtherThread(() -> {
writeResponse(out);
closeResponse(out);
return null;
});
return new ContentChannel() {
@Override
public void write(final ByteBuffer buf, final CompletionHandler handler) {
handler.failed(new ConformanceException());
throw new ConformanceException();
}
@Override
public void close(final CompletionHandler handler) {
handler.completed();
}
};
}
});
}
public abstract void testRequestContentWriteExceptionWithSyncFailureBeforeResponseWrite() throws Throwable;
private void testRequestContentWriteExceptionWithSyncFailureBeforeResponseWrite(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITH_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
final Event failDone = new Event();
callInOtherThread(() -> {
failDone.await();
writeResponse(out);
closeResponse(out);
return null;
});
return new ContentChannel() {
@Override
public void write(final ByteBuffer buf, final CompletionHandler handler) {
try {
handler.failed(new ConformanceException());
} finally {
failDone.happened();
}
throw new ConformanceException();
}
@Override
public void close(final CompletionHandler handler) {
handler.completed();
}
};
}
});
}
public abstract void testRequestContentWriteExceptionWithSyncFailureAfterResponseWrite() throws Throwable;
private void testRequestContentWriteExceptionWithSyncFailureAfterResponseWrite(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITH_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
callInOtherThread(() -> {
writeResponse(out);
closeResponse(out);
return null;
});
return new ContentChannel() {
@Override
public void write(final ByteBuffer buf, final CompletionHandler handler) {
responseWritten.await();
handler.failed(new ConformanceException());
throw new ConformanceException();
}
@Override
public void close(final CompletionHandler handler) {
handler.completed();
}
};
}
});
}
public abstract void testRequestContentWriteExceptionWithSyncFailureAfterResponseCloseNoContent() throws Throwable;
private void testRequestContentWriteExceptionWithSyncFailureAfterResponseCloseNoContent(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITH_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
callInOtherThread(() -> {
closeResponse(out);
return null;
});
return new ContentChannel() {
@Override
public void write(final ByteBuffer buf, final CompletionHandler handler) {
responseClosed.await();
handler.failed(new ConformanceException());
throw new ConformanceException();
}
@Override
public void close(final CompletionHandler handler) {
handler.completed();
}
};
}
});
}
public abstract void testRequestContentWriteExceptionWithNondeterministicAsyncFailure() throws Throwable;
private void testRequestContentWriteExceptionWithNondeterministicAsyncFailure(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITH_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
callInOtherThread(() -> {
writeResponse(out);
closeResponse(out);
return null;
});
return new ContentChannel() {
@Override
public void write(final ByteBuffer buf, final CompletionHandler handler) {
callInOtherThread(() -> {
fail(handler, new ConformanceException(), IllegalStateException.class);
return null;
});
throw new ConformanceException();
}
@Override
public void close(final CompletionHandler handler) {
handler.completed();
}
};
}
});
}
public abstract void testRequestContentWriteExceptionWithAsyncFailureBeforeResponseWrite() throws Throwable;
private void testRequestContentWriteExceptionWithAsyncFailureBeforeResponseWrite(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITH_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
final Event exceptionHandled = new Event();
callInOtherThread(() -> {
exceptionHandled.await();
writeResponse(out);
closeResponse(out);
return null;
});
return new ContentChannel() {
@Override
public void write(final ByteBuffer buf, final CompletionHandler handler) {
callInOtherThread(() -> {
fail(handler, new ConformanceException(exceptionHandled), IllegalStateException.class);
return null;
});
throw new ConformanceException(exceptionHandled);
}
@Override
public void close(final CompletionHandler handler) {
handler.completed();
}
};
}
});
}
public abstract void testRequestContentWriteExceptionWithAsyncFailureAfterResponseWrite() throws Throwable;
private void testRequestContentWriteExceptionWithAsyncFailureAfterResponseWrite(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITH_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
callInOtherThread(() -> {
writeResponse(out);
closeResponse(out);
return null;
});
return new ContentChannel() {
@Override
public void write(final ByteBuffer buf, final CompletionHandler handler) {
callInOtherThread(() -> {
responseWritten.await();
fail(handler, new ConformanceException(), IllegalStateException.class);
return null;
});
responseWritten.await();
throw new ConformanceException();
}
@Override
public void close(final CompletionHandler handler) {
handler.completed();
}
};
}
});
}
public abstract void testRequestContentWriteExceptionWithAsyncFailureAfterResponseCloseNoContent() throws Throwable;
private void testRequestContentWriteExceptionWithAsyncFailureAfterResponseCloseNoContent(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITH_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
callInOtherThread(() -> {
closeResponse(out);
return null;
});
return new ContentChannel() {
@Override
public void write(final ByteBuffer buf, final CompletionHandler handler) {
callInOtherThread(() -> {
responseClosed.await();
fail(handler, new ConformanceException(), IllegalStateException.class);
return null;
});
responseClosed.await();
throw new ConformanceException();
}
@Override
public void close(final CompletionHandler handler) {
handler.completed();
}
};
}
});
}
public abstract void testRequestContentCloseWithSyncCompletion() throws Throwable;
private void testRequestContentCloseWithSyncCompletion(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITHOUT_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
respondWithContentInOtherThread(handler);
return new ContentChannel() {
@Override
public void write(final ByteBuffer buf, final CompletionHandler handler) {
handler.completed();
}
@Override
public void close(final CompletionHandler handler) {
handler.completed();
}
};
}
});
}
public abstract void testRequestContentCloseWithAsyncCompletion() throws Throwable;
private void testRequestContentCloseWithAsyncCompletion(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITHOUT_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
respondWithContentInOtherThread(handler);
return new ContentChannel() {
@Override
public void write(final ByteBuffer buf, final CompletionHandler handler) {
handler.completed();
}
@Override
public void close(final CompletionHandler handler) {
completeInOtherThread(handler);
}
};
}
});
}
public abstract void testRequestContentCloseWithNondeterministicSyncFailure() throws Throwable;
private void testRequestContentCloseWithNondeterministicSyncFailure(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITHOUT_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
callInOtherThread(() -> {
writeResponse(out);
closeResponse(out);
return null;
});
return new ContentChannel() {
@Override
public void write(final ByteBuffer buf, final CompletionHandler handler) {
handler.completed();
}
@Override
public void close(final CompletionHandler handler) {
handler.failed(new ConformanceException());
}
};
}
});
}
public abstract void testRequestContentCloseWithSyncFailureBeforeResponseWrite() throws Throwable;
private void testRequestContentCloseWithSyncFailureBeforeResponseWrite(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITHOUT_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
final Event failDone = new Event();
callInOtherThread(() -> {
failDone.await();
writeResponse(out);
closeResponse(out);
return null;
});
return new ContentChannel() {
@Override
public void write(final ByteBuffer buf, final CompletionHandler handler) {
handler.completed();
}
@Override
public void close(final CompletionHandler handler) {
try {
handler.failed(new ConformanceException());
} finally {
failDone.happened();
}
}
};
}
});
}
public abstract void testRequestContentCloseWithSyncFailureAfterResponseWrite() throws Throwable;
private void testRequestContentCloseWithSyncFailureAfterResponseWrite(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITHOUT_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
callInOtherThread(() -> {
writeResponse(out);
closeResponse(out);
return null;
});
return new ContentChannel() {
@Override
public void write(final ByteBuffer buf, final CompletionHandler handler) {
handler.completed();
}
@Override
public void close(final CompletionHandler handler) {
responseWritten.await();
handler.failed(new ConformanceException());
}
};
}
});
}
public abstract void testRequestContentCloseWithSyncFailureAfterResponseCloseNoContent() throws Throwable;
private void testRequestContentCloseWithSyncFailureAfterResponseCloseNoContent(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITHOUT_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
callInOtherThread(() -> {
closeResponse(out);
return null;
});
return new ContentChannel() {
@Override
public void write(final ByteBuffer buf, final CompletionHandler handler) {
handler.completed();
}
@Override
public void close(final CompletionHandler handler) {
responseClosed.await();
handler.failed(new ConformanceException());
}
};
}
});
}
public abstract void testRequestContentCloseWithNondeterministicAsyncFailure() throws Throwable;
private void testRequestContentCloseWithNondeterministicAsyncFailure(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITHOUT_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
respondWithContentInOtherThread(handler);
return new ContentChannel() {
@Override
public void write(final ByteBuffer buf, final CompletionHandler handler) {
handler.completed();
}
@Override
public void close(final CompletionHandler handler) {
failInOtherThread(handler, new ConformanceException());
}
};
}
});
}
public abstract void testRequestContentCloseWithAsyncFailureBeforeResponseWrite() throws Throwable;
private void testRequestContentCloseWithAsyncFailureBeforeResponseWrite(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITHOUT_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
final Event failDone = new Event();
callInOtherThread(() -> {
failDone.await();
respondWithContent(handler);
return null;
});
return new ContentChannel() {
@Override
public void write(final ByteBuffer buf, final CompletionHandler handler) {
handler.completed();
}
@Override
public void close(final CompletionHandler handler) {
callInOtherThread(() -> {
try {
fail(handler, new ConformanceException());
} finally {
failDone.happened();
}
return null;
});
}
};
}
});
}
public abstract void testRequestContentCloseWithAsyncFailureAfterResponseWrite() throws Throwable;
private void testRequestContentCloseWithAsyncFailureAfterResponseWrite(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITHOUT_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
callInOtherThread(() -> {
respondWithContent(handler);
return null;
});
return new ContentChannel() {
@Override
public void write(final ByteBuffer buf, final CompletionHandler handler) {
handler.completed();
}
@Override
public void close(final CompletionHandler handler) {
callInOtherThread(() -> {
responseWritten.await();
fail(handler, new ConformanceException());
return null;
});
}
};
}
});
}
public abstract void testRequestContentCloseWithAsyncFailureAfterResponseCloseNoContent() throws Throwable;
private void testRequestContentCloseWithAsyncFailureAfterResponseCloseNoContent(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITHOUT_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
callInOtherThread(() -> {
respondNoContent(handler);
return null;
});
return new ContentChannel() {
@Override
public void write(final ByteBuffer buf, final CompletionHandler handler) {
handler.completed();
}
@Override
public void close(final CompletionHandler handler) {
callInOtherThread(() -> {
responseClosed.await();
fail(handler, new ConformanceException());
return null;
});
}
};
}
});
}
public abstract void testRequestContentCloseNondeterministicException() throws Throwable;
private void testRequestContentCloseNondeterministicException(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITHOUT_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
callInOtherThread(() -> {
writeResponse(out);
closeResponse(out);
return null;
});
return new ContentChannel() {
@Override
public void write(final ByteBuffer buf, final CompletionHandler handler) {
handler.completed();
}
@Override
public void close(final CompletionHandler handler) {
throw new ConformanceException();
}
};
}
});
}
public abstract void testRequestContentCloseExceptionBeforeResponseWrite() throws Throwable;
private void testRequestContentCloseExceptionBeforeResponseWrite(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITHOUT_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
final Event exceptionHandledByFramework = new Event();
callInOtherThread(() -> {
exceptionHandledByFramework.await();
writeResponse(out);
closeResponse(out);
return null;
});
return new ContentChannel() {
@Override
public void write(final ByteBuffer buf, final CompletionHandler handler) {
handler.completed();
}
@Override
public void close(final CompletionHandler handler) {
throw new ConformanceException(exceptionHandledByFramework);
}
};
}
});
}
public abstract void testRequestContentCloseExceptionAfterResponseWrite() throws Throwable;
private void testRequestContentCloseExceptionAfterResponseWrite(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITHOUT_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
callInOtherThread(() -> {
writeResponse(out);
closeResponse(out);
return null;
});
return new ContentChannel() {
@Override
public void write(final ByteBuffer buf, final CompletionHandler handler) {
handler.completed();
}
@Override
public void close(final CompletionHandler handler) {
responseWritten.await();
throw new ConformanceException();
}
};
}
});
}
public abstract void testRequestContentCloseExceptionAfterResponseCloseNoContent() throws Throwable;
private void testRequestContentCloseExceptionAfterResponseCloseNoContent(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITHOUT_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
callInOtherThread(() -> {
closeResponse(out);
return null;
});
return new ContentChannel() {
@Override
public void write(final ByteBuffer buf, final CompletionHandler handler) {
handler.completed();
}
@Override
public void close(final CompletionHandler handler) {
responseClosed.await();
throw new ConformanceException();
}
};
}
});
}
public abstract void testRequestContentCloseNondeterministicExceptionWithSyncCompletion() throws Throwable;
private void testRequestContentCloseNondeterministicExceptionWithSyncCompletion(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITHOUT_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
respondWithContentInOtherThread(handler);
return new ContentChannel() {
@Override
public void write(final ByteBuffer buf, final CompletionHandler handler) {
handler.completed();
}
@Override
public void close(final CompletionHandler handler) {
handler.completed();
throw new ConformanceException();
}
};
}
});
}
public abstract void testRequestContentCloseExceptionBeforeResponseWriteWithSyncCompletion() throws Throwable;
private void testRequestContentCloseExceptionBeforeResponseWriteWithSyncCompletion(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITHOUT_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
final Event exceptionHandledByFramework = new Event();
callInOtherThread(() -> {
exceptionHandledByFramework.await();
respondWithContent(handler);
return null;
});
return new ContentChannel() {
@Override
public void write(final ByteBuffer buf, final CompletionHandler handler) {
handler.completed();
}
@Override
public void close(final CompletionHandler handler) {
handler.completed();
throw new ConformanceException(exceptionHandledByFramework);
}
};
}
});
}
public abstract void testRequestContentCloseExceptionAfterResponseWriteWithSyncCompletion() throws Throwable;
private void testRequestContentCloseExceptionAfterResponseWriteWithSyncCompletion(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITHOUT_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
callInOtherThread(() -> {
respondWithContent(handler);
return null;
});
return new ContentChannel() {
@Override
public void write(final ByteBuffer buf, final CompletionHandler handler) {
handler.completed();
}
@Override
public void close(final CompletionHandler handler) {
handler.completed();
responseWritten.await();
throw new ConformanceException();
}
};
}
});
}
public abstract void testRequestContentCloseExceptionAfterResponseCloseNoContentWithSyncCompletion() throws Throwable;
private void testRequestContentCloseExceptionAfterResponseCloseNoContentWithSyncCompletion(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITHOUT_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
callInOtherThread(() -> {
respondNoContent(handler);
return null;
});
return new ContentChannel() {
@Override
public void write(final ByteBuffer buf, final CompletionHandler handler) {
handler.completed();
}
@Override
public void close(final CompletionHandler handler) {
handler.completed();
responseClosed.await();
throw new ConformanceException();
}
};
}
});
}
public abstract void testRequestContentCloseNondeterministicExceptionWithAsyncCompletion() throws Throwable;
private void testRequestContentCloseNondeterministicExceptionWithAsyncCompletion(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITHOUT_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
respondWithContentInOtherThread(handler);
return new ContentChannel() {
@Override
public void write(final ByteBuffer buf, final CompletionHandler handler) {
handler.completed();
}
@Override
public void close(final CompletionHandler handler) {
completeInOtherThread(handler, IllegalStateException.class);
throw new ConformanceException();
}
};
}
});
}
public abstract void testRequestContentCloseExceptionBeforeResponseWriteWithAsyncCompletion() throws Throwable;
private void testRequestContentCloseExceptionBeforeResponseWriteWithAsyncCompletion(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITHOUT_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
final Event exceptionHandledByFramework = new Event();
callInOtherThread(() -> {
exceptionHandledByFramework.await();
respondWithContent(handler);
return null;
});
return new ContentChannel() {
@Override
public void write(final ByteBuffer buf, final CompletionHandler handler) {
handler.completed();
}
@Override
public void close(final CompletionHandler handler) {
completeInOtherThread(handler, IllegalStateException.class);
throw new ConformanceException(exceptionHandledByFramework);
}
};
}
});
}
public abstract void testRequestContentCloseExceptionAfterResponseWriteWithAsyncCompletion() throws Throwable;
private void testRequestContentCloseExceptionAfterResponseWriteWithAsyncCompletion(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITHOUT_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
respondWithContentInOtherThread(handler);
return new ContentChannel() {
@Override
public void write(final ByteBuffer buf, final CompletionHandler handler) {
handler.completed();
}
@Override
public void close(final CompletionHandler handler) {
completeInOtherThread(handler, IllegalStateException.class);
responseWritten.await();
throw new ConformanceException();
}
};
}
});
}
public abstract void testRequestContentCloseExceptionAfterResponseCloseNoContentWithAsyncCompletion() throws Throwable;
private void testRequestContentCloseExceptionAfterResponseCloseNoContentWithAsyncCompletion(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITHOUT_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
respondNoContentInOtherThread(handler);
return new ContentChannel() {
@Override
public void write(final ByteBuffer buf, final CompletionHandler handler) {
handler.completed();
}
@Override
public void close(final CompletionHandler handler) {
completeInOtherThread(handler, IllegalStateException.class);
responseClosed.await();
throw new ConformanceException();
}
};
}
});
}
public abstract void testRequestContentCloseNondeterministicExceptionWithSyncFailure() throws Throwable;
private void testRequestContentCloseNondeterministicExceptionWithSyncFailure(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITHOUT_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
callInOtherThread(() -> {
writeResponse(out);
closeResponse(out);
return null;
});
return new ContentChannel() {
@Override
public void write(final ByteBuffer buf, final CompletionHandler handler) {
handler.completed();
}
@Override
public void close(final CompletionHandler handler) {
handler.failed(new ConformanceException());
throw new ConformanceException();
}
};
}
});
}
public abstract void testRequestContentCloseExceptionBeforeResponseWriteWithSyncFailure() throws Throwable;
private void testRequestContentCloseExceptionBeforeResponseWriteWithSyncFailure(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITHOUT_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
final Event failDone = new Event();
callInOtherThread(() -> {
failDone.await();
writeResponse(out);
closeResponse(out);
return null;
});
return new ContentChannel() {
@Override
public void write(final ByteBuffer buf, final CompletionHandler handler) {
handler.completed();
}
@Override
public void close(final CompletionHandler handler) {
try {
handler.failed(new ConformanceException());
} finally {
failDone.happened();
}
throw new ConformanceException();
}
};
}
});
}
public abstract void testRequestContentCloseExceptionAfterResponseWriteWithSyncFailure() throws Throwable;
private void testRequestContentCloseExceptionAfterResponseWriteWithSyncFailure(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITHOUT_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
callInOtherThread(() -> {
writeResponse(out);
closeResponse(out);
return null;
});
return new ContentChannel() {
@Override
public void write(final ByteBuffer buf, final CompletionHandler handler) {
handler.completed();
}
@Override
public void close(final CompletionHandler handler) {
responseWritten.await();
handler.failed(new ConformanceException());
throw new ConformanceException();
}
};
}
});
}
public abstract void testRequestContentCloseExceptionAfterResponseCloseNoContentWithSyncFailure() throws Throwable;
private void testRequestContentCloseExceptionAfterResponseCloseNoContentWithSyncFailure(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITHOUT_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
callInOtherThread(() -> {
closeResponse(out);
return null;
});
return new ContentChannel() {
@Override
public void write(final ByteBuffer buf, final CompletionHandler handler) {
handler.completed();
}
@Override
public void close(final CompletionHandler handler) {
responseClosed.await();
handler.failed(new ConformanceException());
throw new ConformanceException();
}
};
}
});
}
public abstract void testRequestContentCloseNondeterministicExceptionWithAsyncFailure() throws Throwable;
private void testRequestContentCloseNondeterministicExceptionWithAsyncFailure(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITHOUT_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
callInOtherThread(() -> {
writeResponse(out);
closeResponse(out);
return null;
});
return new ContentChannel() {
@Override
public void write(final ByteBuffer buf, final CompletionHandler handler) {
handler.completed();
}
@Override
public void close(final CompletionHandler handler) {
callInOtherThread(() -> {
fail(handler, new ConformanceException(), IllegalStateException.class);
return null;
});
throw new ConformanceException();
}
};
}
});
}
public abstract void testRequestContentCloseExceptionBeforeResponseWriteWithAsyncFailure() throws Throwable;
private void testRequestContentCloseExceptionBeforeResponseWriteWithAsyncFailure(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITHOUT_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
final Event exceptionHandled = new Event();
callInOtherThread(() -> {
exceptionHandled.await();
writeResponse(out);
closeResponse(out);
return null;
});
return new ContentChannel() {
@Override
public void write(final ByteBuffer buf, final CompletionHandler handler) {
handler.completed();
}
@Override
public void close(final CompletionHandler handler) {
callInOtherThread(() -> {
fail(handler, new ConformanceException(exceptionHandled), IllegalStateException.class);
return null;
});
throw new ConformanceException(exceptionHandled);
}
};
}
});
}
public abstract void testRequestContentCloseExceptionAfterResponseWriteWithAsyncFailure() throws Throwable;
private void testRequestContentCloseExceptionAfterResponseWriteWithAsyncFailure(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITHOUT_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
callInOtherThread(() -> {
writeResponse(out);
closeResponse(out);
return null;
});
return new ContentChannel() {
@Override
public void write(final ByteBuffer buf, final CompletionHandler handler) {
handler.completed();
}
@Override
public void close(final CompletionHandler handler) {
callInOtherThread(() -> {
responseWritten.await();
fail(handler, new ConformanceException(), IllegalStateException.class);
return null;
});
responseWritten.await();
throw new ConformanceException();
}
};
}
});
}
public abstract void testRequestContentCloseExceptionAfterResponseCloseNoContentWithAsyncFailure() throws Throwable;
private void testRequestContentCloseExceptionAfterResponseCloseNoContentWithAsyncFailure(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITHOUT_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(final Request request, final ResponseHandler handler) {
final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
callInOtherThread(() -> {
closeResponse(out);
return null;
});
return new ContentChannel() {
@Override
public void write(final ByteBuffer buf, final CompletionHandler handler) {
handler.completed();
}
@Override
public void close(final CompletionHandler handler) {
callInOtherThread(() -> {
responseClosed.await();
fail(handler, new ConformanceException(), IllegalStateException.class);
return null;
});
responseClosed.await();
throw new ConformanceException();
}
};
}
});
}
public abstract void testResponseWriteCompletionException() throws Throwable;
private void testResponseWriteCompletionException(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITHOUT_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(Request request, ResponseHandler handler) {
final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
for (ByteBuffer buf : adapter.newResponseContent()) {
out.write(buf, EXCEPTION_COMPLETION_HANDLER);
}
closeResponse(out);
return null;
}
});
}
public abstract void testResponseCloseCompletionException() throws Throwable;
private void testResponseCloseCompletionException(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITHOUT_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(Request request, ResponseHandler handler) {
ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
writeResponse(out);
out.close(EXCEPTION_COMPLETION_HANDLER);
return null;
}
});
}
public abstract void testResponseCloseCompletionExceptionNoContent() throws Throwable;
private void testResponseCloseCompletionExceptionNoContent(
final Adapter adapter,
final Module... config)
throws Throwable {
runTest(adapter,
Modules.combine(config),
RequestType.WITHOUT_CONTENT,
new TestRequestHandler() {
@Override
public ContentChannel handle(Request request, ResponseHandler handler) {
ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
out.close(EXCEPTION_COMPLETION_HANDLER);
return null;
}
});
}
// -------------------------------------------------------------------------------------------------------------- //
// //
// The following section is implementation details that are not necessary to understand in order to implement a //
// conformance test for a ServerProvider. //
// //
// -------------------------------------------------------------------------------------------------------------- //
protected void runTest(
final Adapter adapter,
final Module... guiceModules)
throws Throwable {
Class clazz = ServerProviderConformanceTest.class;
StackTraceElement[] stack = Thread.currentThread().getStackTrace();
for (StackTraceElement element : stack) {
Method method;
final String methodName = element.getMethodName();
try {
method = clazz.getDeclaredMethod(methodName);
} catch (NoSuchMethodException e) {
continue;
}
if (!Modifier.isAbstract(method.getModifiers())) {
continue;
}
try {
method = clazz.getDeclaredMethod(methodName, Adapter.class, Module[].class);
System.out.println("Invoking test method " + methodName);
method.invoke(this, adapter, guiceModules);
return;
} catch (InvocationTargetException e) {
throw e.getCause();
}
}
throw new UnsupportedOperationException("Method runTest() not called from overridden testXXX() method.");
}
// The only purpose of this is to avoid magic literals in calls to runTest (which we'd have if we used a bool flag).
private enum RequestType {
WITHOUT_CONTENT, WITH_CONTENT
}
private void runTest(
final Adapter adapter,
final Module testConfig,
final RequestType requestType,
final TestRequestHandler requestHandler)
throws Throwable {
final Module config = Modules.override(newDefaultConfig(), adapter.newConfigModule()).with(testConfig);
final TestDriver driver = TestDriver.newSimpleApplicationInstance(config);
final ContainerBuilder builder = driver.newContainerBuilder();
builder.serverBindings().bind(builder.getInstance(Key.get(String.class, Names.named("serverBinding"))),
requestHandler);
final T serverProvider = builder.guiceModules().getInstance(adapter.getServerProviderClass());
builder.serverProviders().install(serverProvider);
if (builder.getInstance(Key.get(Boolean.class, Names.named("activateContainer")))) {
driver.activateContainer(builder);
}
serverProvider.start();
serverProvider.release();
for (int i = 0; i < NUM_RUNS_EACH_TEST; ++i) {
log.fine("Test run #" + i);
requestHandler.reset(adapter.newResponseContent());
final U client = adapter.newClient(serverProvider);
final boolean withRequestContent = requestType == RequestType.WITH_CONTENT;
final V result = adapter.executeRequest(client, withRequestContent);
adapter.validateResponse(result);
if (client instanceof Closeable) {
((Closeable) client).close();
}
requestHandler.awaitAsyncTasks();
}
serverProvider.close();
driver.close();
}
private static Module newDefaultConfig() {
return Modules.combine(newServerBinding("*://*/*"),
newActivateContainer(true));
}
private static Module newBindingSetSelector(final String bindingSetName) {
return new AbstractModule() {
@Override
protected void configure() {
bind(BindingSetSelector.class).toInstance(uri -> bindingSetName);
}
};
}
private static Module newServerBinding(final String serverBinding) {
return new AbstractModule() {
@Override
protected void configure() {
bind(String.class).annotatedWith(Names.named("serverBinding")).toInstance(serverBinding);
}
};
}
private static Module newActivateContainer(final boolean activateContainer) {
return new AbstractModule() {
@Override
protected void configure() {
bind(Boolean.class).annotatedWith(Names.named("activateContainer")).toInstance(activateContainer);
}
};
}
/**
* Wrapper around CountDownLatch for single-occurrence events.
*/
private static class Event {
private final CountDownLatch latch = new CountDownLatch(1);
public void happened() {
latch.countDown();
}
public void await() {
try {
final boolean success = latch.await(600, TimeUnit.SECONDS);
if (!success) {
throw new IllegalStateException("Wait for required condition timed out");
}
} catch (InterruptedException e) {
throw new IllegalStateException("Wait for required condition was interrupted", e);
}
}
}
private static abstract class TestRequestHandler extends AbstractRequestHandler {
private static class TaskHandle {
private final Exception stackTrace = new Exception();
@Override
public String toString() {
final StringWriter stringWriter = new StringWriter();
stackTrace.printStackTrace(new PrintWriter(stringWriter));
return "(" + stringWriter.toString() + ")";
}
}
protected Event responseWritten;
protected Event responseClosed;
private ExecutorService executor;
private final Object taskMonitor = new Object();
private Set pendingTasks = new HashSet<>();
private Exception taskException;
private Iterable responseContent;
public void reset(final Iterable responseContent) {
synchronized (taskMonitor) {
if (!pendingTasks.isEmpty()) {
throw new AssertionError("pendingTasks should be empty, was " + pendingTasks);
}
}
this.executor = Executors.newCachedThreadPool();
this.responseWritten = new Event();
this.responseClosed = new Event();
this.responseContent = responseContent;
this.taskException = null;
}
protected final void callInOtherThread(final Callable task) {
final TaskHandle taskHandle = addTask();
final Runnable runnable = () -> {
try {
task.call();
} catch (Exception e) {
setTaskFailure(e);
}
removeTask(taskHandle);
};
try {
executor.submit(runnable);
} catch (RejectedExecutionException e) {
setTaskFailure(e);
removeTask(taskHandle);
}
}
private void setTaskFailure(Exception e) {
synchronized (taskMonitor) {
if (taskException == null) {
taskException = e;
} else {
System.out.println("Got subsequent exception in task execution: ");
e.printStackTrace();
}
}
}
private void removeTask(final TaskHandle taskHandle) {
synchronized (taskMonitor) {
pendingTasks.remove(taskHandle);
taskMonitor.notifyAll();
}
}
private TaskHandle addTask() {
final TaskHandle taskHandle = new TaskHandle();
synchronized (taskMonitor) {
pendingTasks.add(taskHandle);
}
return taskHandle;
}
protected final void writeResponse(final ContentChannel out) {
try {
writeAll(out, responseContent);
} finally {
responseWritten.happened();
}
}
private void writeAll(final ContentChannel out, final Iterable content) {
for (ByteBuffer buf : content) {
out.write(buf, newCompletionHandler());
}
}
protected final void closeResponseInOtherThread(final ContentChannel out) {
callInOtherThread(() -> {
closeResponse(out);
return null;
});
}
protected final void closeResponse(final ContentChannel out) {
try {
out.close(newCompletionHandler());
} finally {
responseClosed.happened();
}
}
protected final CompletionHandler newCompletionHandler() {
final CallableCompletionHandler handler = new CallableCompletionHandler();
callInOtherThread(handler);
return handler;
}
protected final void respondWithContentInOtherThread(final ResponseHandler handler) {
callInOtherThread(() -> {
respondWithContent(handler);
return null;
});
}
protected final void respondNoContentInOtherThread(final ResponseHandler handler) {
callInOtherThread(() -> {
respondNoContent(handler);
return null;
});
}
protected void respondWithContent(final ResponseHandler handler) {
final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
writeResponse(out);
closeResponse(out);
}
protected void respondNoContent(final ResponseHandler handler) {
final ContentChannel out = handler.handleResponse(new Response(Response.Status.OK));
closeResponse(out);
}
protected final void completeInOtherThread(
final CompletionHandler handler,
final Class>... allowedExceptionTypes) {
callInOtherThread(() -> {
try {
handler.completed();
} catch (Throwable t) {
if (!isInstanceOfAnyOf(t, allowedExceptionTypes)) {
throw t;
}
}
return null;
});
}
protected final void fail(
final CompletionHandler handler,
final Throwable failure,
final Class>... allowedExceptionTypes) {
try {
handler.failed(failure);
} catch (Throwable t) {
if (!isInstanceOfAnyOf(t, allowedExceptionTypes)) {
throw t;
}
}
}
private static boolean isInstanceOfAnyOf(final Object object, final Class>... types) {
return Stream.of(types).anyMatch(type -> type.isAssignableFrom(object.getClass()));
}
protected final void failInOtherThread(
final CompletionHandler handler,
final Throwable failure,
final Class>... allowedExceptionTypes) {
callInOtherThread(() -> {
fail(handler, failure, allowedExceptionTypes);
return null;
});
}
@Override
public final ContentChannel handleRequest(final Request request, final ResponseHandler responseHandler) {
// Ensure that task executor is not shut down before handleResponse() is done.
final TaskHandle handleResponseTask = addTask();
try {
final ContentChannel requestContentChannel = handle(request, responseHandler);
if (requestContentChannel == null) {
return null;
}
// Ensure that task executor is not shut down before close() is done.
final TaskHandle requestContentChannelCloseTask = addTask();
return new ContentChannel() {
@Override
public void write(ByteBuffer buf, CompletionHandler handler) {
requestContentChannel.write(buf, handler);
}
@Override
public void close(CompletionHandler handler) {
try {
requestContentChannel.close(handler);
} finally {
removeTask(requestContentChannelCloseTask);
}
}
};
} finally {
removeTask(handleResponseTask);
}
}
protected abstract ContentChannel handle(Request request, ResponseHandler responseHandler);
public final void awaitAsyncTasks() throws Exception {
final long maxWaitTimeMillis = 600_000L;
final long startTimeMillis = System.currentTimeMillis();
synchronized (taskMonitor) {
while (!pendingTasks.isEmpty()) {
final long currentTimeMillis = System.currentTimeMillis();
final long timeElapsedMillis = currentTimeMillis - startTimeMillis;
if (timeElapsedMillis >= maxWaitTimeMillis) {
throw new IllegalStateException(
"Wait timed out, still have the following pending tasks: " + pendingTasks);
}
final long waitTimeMillis = maxWaitTimeMillis - timeElapsedMillis;
taskMonitor.wait(waitTimeMillis);
}
}
executor.shutdown();
final boolean haltedCleanly = executor.awaitTermination(600, TimeUnit.SECONDS);
if (!haltedCleanly) {
throw new IllegalStateException("Some tasks did not finish. executor=" + executor);
}
synchronized (taskMonitor) {
if (taskException != null) {
throw new Exception("Task threw exception", taskException);
}
}
}
}
private static class CallableCompletionHandler implements Callable, CompletionHandler {
final CountDownLatch done = new CountDownLatch(1);
@Override
public void completed() {
done.countDown();
}
@Override
public void failed(final Throwable t) {
done.countDown();
}
@Override
public Void call() throws Exception {
if (!done.await(600, TimeUnit.SECONDS)) {
throw new TimeoutException();
}
return null;
}
}
private static final CompletionHandler EXCEPTION_COMPLETION_HANDLER = new CompletionHandler() {
@Override
public void completed() {
throw new ConformanceException();
}
@Override
public void failed(Throwable t) {
throw new ConformanceException();
}
};
}