aboutsummaryrefslogtreecommitdiffstats
path: root/jdisc_core/src/test/java/com/yahoo/jdisc/core/TimeoutManagerImplTestCase.java
diff options
context:
space:
mode:
Diffstat (limited to 'jdisc_core/src/test/java/com/yahoo/jdisc/core/TimeoutManagerImplTestCase.java')
-rw-r--r--jdisc_core/src/test/java/com/yahoo/jdisc/core/TimeoutManagerImplTestCase.java579
1 files changed, 579 insertions, 0 deletions
diff --git a/jdisc_core/src/test/java/com/yahoo/jdisc/core/TimeoutManagerImplTestCase.java b/jdisc_core/src/test/java/com/yahoo/jdisc/core/TimeoutManagerImplTestCase.java
new file mode 100644
index 00000000000..46464a9b05a
--- /dev/null
+++ b/jdisc_core/src/test/java/com/yahoo/jdisc/core/TimeoutManagerImplTestCase.java
@@ -0,0 +1,579 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jdisc.core;
+
+import com.google.inject.Binder;
+import com.google.inject.Module;
+import com.yahoo.jdisc.AbstractResource;
+import com.yahoo.jdisc.Container;
+import com.yahoo.jdisc.Request;
+import com.yahoo.jdisc.Response;
+import com.yahoo.jdisc.TimeoutManager;
+import com.yahoo.jdisc.Timer;
+import com.yahoo.jdisc.application.ContainerBuilder;
+import com.yahoo.jdisc.handler.CompletionHandler;
+import com.yahoo.jdisc.handler.ContentChannel;
+import com.yahoo.jdisc.handler.RequestDeniedException;
+import com.yahoo.jdisc.handler.RequestDispatch;
+import com.yahoo.jdisc.handler.RequestHandler;
+import com.yahoo.jdisc.handler.ResponseHandler;
+import com.yahoo.jdisc.service.CurrentContainer;
+import com.yahoo.jdisc.test.NonWorkingRequest;
+import com.yahoo.jdisc.test.TestDriver;
+import org.junit.Test;
+
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class TimeoutManagerImplTestCase {
+
+ private static final String REQUEST_URI = "http://host/path";
+
+ @Test
+ public void requireThatDefaultIsNoTimeout() {
+ Context ctx = new Context(MyRequestHandler.newEagerResponse());
+ assertNull(ctx.dispatchRequest(null, MyResponseHandler.newInstance()));
+ assertTrue(ctx.close());
+ }
+
+ @Test
+ public void requireThatTimeoutCanBeSetByServerProvider() {
+ Context ctx = new Context(MyRequestHandler.newEagerResponse());
+ assertEquals(Long.valueOf(69), ctx.dispatchRequest(69L, MyResponseHandler.newInstance()));
+ assertTrue(ctx.close());
+ }
+
+ @Test
+ public void requireThatTimeoutCanBeSetByRequestHandler() {
+ Context ctx = new Context(MyRequestHandler.newTimeoutWithEagerResponse(69));
+ assertEquals(Long.valueOf(69), ctx.dispatchRequest(null, MyResponseHandler.newInstance()));
+ assertTrue(ctx.close());
+ }
+
+ @Test
+ public void requireThatTimeoutRequestHandlerTimeoutHasPrecedence() {
+ Context ctx = new Context(MyRequestHandler.newTimeoutWithEagerResponse(6));
+ assertEquals(Long.valueOf(6), ctx.dispatchRequest(9L, MyResponseHandler.newInstance()));
+ assertTrue(ctx.close());
+ }
+
+ @Test
+ public void requireThatResponseCancelsTimeout() throws InterruptedException {
+ Context ctx = new Context(MyRequestHandler.newEagerResponse());
+ assertEquals(Response.Status.OK, ctx.awaitResponse(69L, MyResponseHandler.newInstance()));
+ assertEquals(Response.Status.OK, ctx.awaitResponse(69L, MyResponseHandler.newInstance()));
+ assertTrue(ctx.close());
+ }
+
+ @Test
+ public void requireThatNullRequestContentCanTimeout() throws InterruptedException {
+ Context ctx = new Context(MyRequestHandler.newNullContent());
+ assertEquals(Response.Status.REQUEST_TIMEOUT, ctx.awaitResponse(69L, MyResponseHandler.newInstance()));
+ assertEquals(Response.Status.REQUEST_TIMEOUT, ctx.awaitResponse(69L, MyResponseHandler.newInstance()));
+ assertTrue(ctx.close());
+ }
+
+ @Test
+ public void requireThatTimeoutWorksAfterRequestDenied() throws InterruptedException {
+ Context ctx = new Context(MyRequestHandler.newFirstRequestDenied());
+ try {
+ ctx.dispatchRequest(null, MyResponseHandler.newInstance());
+ fail();
+ } catch (RequestDeniedException e) {
+
+ }
+ assertEquals(Response.Status.REQUEST_TIMEOUT, ctx.awaitResponse(69L, MyResponseHandler.newInstance()));
+ assertTrue(ctx.close());
+ }
+
+ @Test
+ public void requireThatTimeoutWorksAfterResponseDenied() throws InterruptedException {
+ Context ctx = new Context(MyRequestHandler.newInstance());
+ assertEquals(Response.Status.REQUEST_TIMEOUT, ctx.awaitResponse(69L, MyResponseHandler.newResponseDenied()));
+ assertEquals(Response.Status.REQUEST_TIMEOUT, ctx.awaitResponse(69L, MyResponseHandler.newInstance()));
+ assertTrue(ctx.close());
+ }
+
+ @Test
+ public void requireThatTimeoutWorksAfterResponseThrowsException() throws InterruptedException {
+ Context ctx = new Context(MyRequestHandler.newInstance());
+ assertEquals(Response.Status.REQUEST_TIMEOUT, ctx.awaitResponse(69L, MyResponseHandler.newThrowException()));
+ assertEquals(Response.Status.REQUEST_TIMEOUT, ctx.awaitResponse(69L, MyResponseHandler.newInstance()));
+ assertTrue(ctx.close());
+ }
+
+ @Test
+ public void requireThatTimeoutWorksAfterResponseInterruptsThread() throws InterruptedException {
+ Context ctx = new Context(MyRequestHandler.newInstance());
+ assertEquals(Response.Status.REQUEST_TIMEOUT, ctx.awaitResponse(69L, MyResponseHandler.newInterruptThread()));
+ assertEquals(Response.Status.REQUEST_TIMEOUT, ctx.awaitResponse(69L, MyResponseHandler.newInstance()));
+ assertTrue(ctx.close());
+ }
+
+ @Test
+ public void requireThatTimeoutOccursInOrder() throws InterruptedException {
+ Context ctx = new Context(MyRequestHandler.newInstance());
+ MyResponseHandler foo = MyResponseHandler.newInstance();
+ ctx.dispatchRequest(300L, foo);
+
+ MyResponseHandler bar = MyResponseHandler.newInstance();
+ ctx.dispatchRequest(100L, bar);
+
+ MyResponseHandler baz = MyResponseHandler.newInstance();
+ ctx.dispatchRequest(200L, baz);
+
+ ctx.forwardToTime(100);
+ assertFalse(foo.await(10, TimeUnit.MILLISECONDS));
+ assertTrue(bar.await(600, TimeUnit.SECONDS));
+ assertFalse(baz.await(10, TimeUnit.MILLISECONDS));
+
+ ctx.forwardToTime(200);
+ assertFalse(foo.await(10, TimeUnit.MILLISECONDS));
+ assertTrue(baz.await(600, TimeUnit.SECONDS));
+
+ ctx.forwardToTime(300);
+ assertTrue(foo.await(600, TimeUnit.SECONDS));
+
+ assertTrue(ctx.close());
+ }
+
+ @Test
+ public void requireThatResponseHandlerIsWellBehavedAfterTimeout() throws InterruptedException {
+ Context ctx = new Context(MyRequestHandler.newInstance());
+ assertEquals(Response.Status.REQUEST_TIMEOUT, ctx.awaitResponse(69L, MyResponseHandler.newInstance()));
+
+ ContentChannel content = ctx.requestHandler.responseHandler.handleResponse(new Response(Response.Status.OK));
+ assertNotNull(content);
+
+ content.write(ByteBuffer.allocate(69), null);
+ MyCompletion completion = new MyCompletion();
+ content.write(ByteBuffer.allocate(69), completion);
+ assertTrue(completion.completed.await(600, TimeUnit.SECONDS));
+
+ completion = new MyCompletion();
+ content.close(completion);
+ assertTrue(completion.completed.await(600, TimeUnit.SECONDS));
+
+ assertTrue(ctx.close());
+ }
+
+ @Test
+ public void requireThatManagedHandlerForwardsAllCalls() throws InterruptedException {
+ Request request = NonWorkingRequest.newInstance(REQUEST_URI);
+ MyRequestHandler requestHandler = MyRequestHandler.newInstance();
+ TimeoutManagerImpl timeoutManager = new TimeoutManagerImpl(Executors.defaultThreadFactory(),
+ new SystemTimer());
+ RequestHandler managedHandler = timeoutManager.manageHandler(requestHandler);
+
+ MyResponseHandler responseHandler = MyResponseHandler.newInstance();
+ ContentChannel requestContent = managedHandler.handleRequest(request, responseHandler);
+ assertNotNull(requestContent);
+
+ ByteBuffer buf = ByteBuffer.allocate(69);
+ requestContent.write(buf, null);
+ assertSame(buf, requestHandler.content.buf);
+ MyCompletion writeCompletion = new MyCompletion();
+ requestContent.write(buf = ByteBuffer.allocate(69), writeCompletion);
+ assertSame(buf, requestHandler.content.buf);
+ requestHandler.content.writeCompletion.completed();
+ assertTrue(writeCompletion.completed.await(600, TimeUnit.SECONDS));
+
+ MyCompletion closeCompletion = new MyCompletion();
+ requestContent.close(closeCompletion);
+ requestHandler.content.closeCompletion.completed();
+ assertTrue(closeCompletion.completed.await(600, TimeUnit.SECONDS));
+
+ managedHandler.release();
+ assertTrue(requestHandler.destroyed);
+
+ Response response = new Response(Response.Status.OK);
+ ContentChannel responseContent = requestHandler.responseHandler.handleResponse(response);
+ assertNotNull(responseContent);
+
+ responseContent.write(buf = ByteBuffer.allocate(69), null);
+ assertSame(buf, responseHandler.content.buf);
+ responseContent.write(buf = ByteBuffer.allocate(69), writeCompletion = new MyCompletion());
+ assertSame(buf, responseHandler.content.buf);
+ responseHandler.content.writeCompletion.completed();
+ assertTrue(writeCompletion.completed.await(600, TimeUnit.SECONDS));
+
+ responseContent.close(closeCompletion = new MyCompletion());
+ responseHandler.content.closeCompletion.completed();
+ assertTrue(closeCompletion.completed.await(600, TimeUnit.SECONDS));
+
+ assertSame(response, responseHandler.response.get());
+ }
+
+ @Test
+ public void requireThatTimeoutOccursAtExpectedTime() throws InterruptedException {
+ final Context ctx = new Context(MyRequestHandler.newInstance());
+ final MyResponseHandler responseHandler = MyResponseHandler.newInstance();
+
+ ctx.forwardToTime(100);
+ new RequestDispatch() {
+
+ @Override
+ protected Request newRequest() {
+ Request request = new Request(ctx.driver, URI.create(REQUEST_URI));
+ request.setTimeout(300, TimeUnit.MILLISECONDS);
+ return request;
+ }
+
+ @Override
+ public ContentChannel handleResponse(Response response) {
+ return responseHandler.handleResponse(response);
+ }
+ }.dispatch();
+
+ ctx.forwardToTime(300);
+ assertFalse(responseHandler.await(100, TimeUnit.MILLISECONDS));
+ ctx.forwardToTime(400);
+ assertTrue(responseHandler.await(600, TimeUnit.SECONDS));
+
+ Response response = responseHandler.response.get();
+ assertNotNull(response);
+ assertEquals(Response.Status.REQUEST_TIMEOUT, response.getStatus());
+ assertTrue(ctx.close());
+ }
+
+ @Test
+ public void requireThatQueueEntryIsRemovedWhenResponseHandlerIsCalledBeforeTimeout() {
+ Context ctx = new Context(MyRequestHandler.newInstance());
+ ctx.dispatchRequest(69L, MyResponseHandler.newInstance());
+ assertTrue(ctx.awaitQueueSize(1, 600, TimeUnit.SECONDS));
+ ctx.requestHandler.respond();
+ assertTrue(ctx.awaitQueueSize(0, 600, TimeUnit.SECONDS));
+ assertTrue(ctx.close());
+ }
+
+ @Test
+ public void requireThatNoEntryIsMadeIfTimeoutIsNull() {
+ Context ctx = new Context(MyRequestHandler.newInstance());
+ ctx.dispatchRequest(null, MyResponseHandler.newInstance());
+ assertFalse(ctx.awaitQueueSize(1, 100, TimeUnit.MILLISECONDS));
+ assertTrue(ctx.awaitQueueSize(0, 600, TimeUnit.SECONDS));
+ ctx.requestHandler.respond();
+ assertTrue(ctx.close());
+ }
+
+ @Test
+ public void requireThatNoEntryIsMadeIfHandleRequestCallsHandleResponse() {
+ Context ctx = new Context(MyRequestHandler.newEagerResponse());
+ ctx.dispatchRequest(69L, MyResponseHandler.newInstance());
+ assertFalse(ctx.awaitQueueSize(1, 100, TimeUnit.MILLISECONDS));
+ assertTrue(ctx.awaitQueueSize(0, 600, TimeUnit.SECONDS));
+ assertTrue(ctx.close());
+ }
+
+ @Test
+ public void requireThatNoEntryIsMadeIfTimeoutHandlerHasBeenSet() {
+ final Context ctx = new Context(MyRequestHandler.newInstance());
+ new RequestDispatch() {
+
+ @Override
+ protected Request newRequest() {
+ Request request = new Request(ctx.driver, URI.create(REQUEST_URI));
+ request.setTimeout(10, TimeUnit.MILLISECONDS);
+ request.setTimeoutManager(new TimeoutManager() {
+
+ @Override
+ public void scheduleTimeout(Request request) {
+
+ }
+ });
+ return request;
+ }
+ }.dispatch();
+
+ assertFalse(ctx.awaitQueueSize(1, 100, TimeUnit.MILLISECONDS));
+ assertTrue(ctx.awaitQueueSize(0, 600, TimeUnit.SECONDS));
+ ctx.requestHandler.respond();
+ assertTrue(ctx.close());
+ }
+
+ private static class Context implements Module, Timer {
+
+ final MyRequestHandler requestHandler;
+ final TimeoutManagerImpl timeoutManager;
+ final TestDriver driver;
+ long millis = 0;
+
+ Context(MyRequestHandler requestHandler) {
+ this.requestHandler = requestHandler;
+ this.driver = TestDriver.newSimpleApplicationInstanceWithoutOsgi(this);
+
+ ContainerBuilder builder = driver.newContainerBuilder();
+ builder.serverBindings().bind(REQUEST_URI, requestHandler);
+ driver.activateContainer(builder);
+
+ Container ref = driver.newReference(URI.create(REQUEST_URI));
+ timeoutManager = ref.getInstance(TimeoutManagerImpl.class);
+ ref.release();
+ }
+
+ void forwardToTime(long millis) {
+ while (this.millis < millis) {
+ this.millis += ScheduledQueue.MILLIS_PER_SLOT;
+ timeoutManager.checkTasks(this.millis);
+ }
+ }
+
+ boolean close() {
+ return driver.close();
+ }
+
+ @Override
+ public void configure(Binder binder) {
+ binder.bind(Timer.class).toInstance(this);
+ }
+
+ @Override
+ public long currentTimeMillis() {
+ return millis;
+ }
+
+ int awaitResponse(Long serverProviderTimeout, MyResponseHandler responseHandler) throws InterruptedException {
+ Long timeout = new MyServerProvider(serverProviderTimeout).dispatchRequest(driver, responseHandler);
+ long timeoutAt;
+ if (timeout == null) {
+ timeoutAt = millis + TimeUnit.SECONDS.toMillis(120);
+ } else {
+ timeoutAt = millis + timeout;
+ }
+ forwardToTime(timeoutAt);
+ if (!responseHandler.await(600, TimeUnit.SECONDS)) {
+ fail("Request handler failed to respond within allocated time.");
+ }
+ return responseHandler.response.get().getStatus();
+ }
+
+ boolean awaitQueueSize(int expectedSize, int timeout, TimeUnit unit) {
+ for (long i = 0, len = unit.toMillis(timeout) / 100; i < len; ++i) {
+ if (timeoutManager.queueSize() == expectedSize) {
+ return true;
+ }
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ fail();
+ }
+ }
+ return false;
+ }
+
+ public Long dispatchRequest(Long serverProviderTimeout, MyResponseHandler responseHandler) {
+ return new MyServerProvider(serverProviderTimeout).dispatchRequest(driver, responseHandler);
+ }
+ }
+
+ private static class MyServerProvider {
+
+ final Long timeout;
+
+ MyServerProvider(Long timeout) {
+ this.timeout = timeout;
+ }
+
+ Long dispatchRequest(CurrentContainer container, ResponseHandler responseHandler) {
+ Request request = null;
+ ContentChannel content = null;
+ try {
+ request = new Request(container, URI.create(REQUEST_URI));
+ if (timeout != null) {
+ request.setTimeout(timeout, TimeUnit.MILLISECONDS);
+ }
+ content = request.connect(responseHandler);
+ } finally {
+ if (request != null) {
+ request.release();
+ }
+ if (content != null) {
+ content.close(null);
+ }
+ }
+ return request.getTimeout(TimeUnit.MILLISECONDS);
+ }
+ }
+
+ private static class MyCompletion implements CompletionHandler {
+
+ final CountDownLatch completed = new CountDownLatch(1);
+
+ @Override
+ public void completed() {
+ completed.countDown();
+ }
+
+ @Override
+ public void failed(Throwable t) {
+
+ }
+ }
+
+ private static class MyContent implements ContentChannel {
+
+ ByteBuffer buf;
+ CompletionHandler writeCompletion;
+ CompletionHandler closeCompletion;
+
+ @Override
+ public void write(ByteBuffer buf, CompletionHandler handler) {
+ this.buf = buf;
+ this.writeCompletion = handler;
+ if (handler != null) {
+ handler.completed();
+ }
+ }
+
+ @Override
+ public void close(CompletionHandler handler) {
+ this.closeCompletion = handler;
+ if (handler != null) {
+ handler.completed();
+ }
+ }
+
+ static MyContent newInstance() {
+ return new MyContent();
+ }
+ }
+
+ private static class MyResponseHandler implements ResponseHandler {
+
+ final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
+ final AtomicReference<Response> response = new AtomicReference<>();
+ final MyContent content;
+ final boolean throwException;
+ final boolean interruptThread;
+
+ MyResponseHandler(MyContent content, boolean throwException, boolean interruptThread) {
+ this.content = content;
+ this.throwException = throwException;
+ this.interruptThread = interruptThread;
+ }
+
+ boolean await(long timeout, TimeUnit unit) throws InterruptedException {
+ return latch.get().await(timeout, unit);
+ }
+
+ @Override
+ public ContentChannel handleResponse(Response response) {
+ if (this.response.getAndSet(response) != null) {
+ throw new IllegalStateException("Response already received.");
+ }
+ latch.get().countDown();
+ if (interruptThread) {
+ Thread.currentThread().interrupt();
+ }
+ if (throwException) {
+ throw new MyException();
+ }
+ return content;
+ }
+
+ static MyResponseHandler newInstance() {
+ return new MyResponseHandler(MyContent.newInstance(), false, false);
+ }
+
+ static MyResponseHandler newResponseDenied() {
+ return new MyResponseHandler(null, false, false);
+ }
+
+ static MyResponseHandler newThrowException() {
+ return new MyResponseHandler(null, true, false);
+ }
+
+ static MyResponseHandler newInterruptThread() {
+ return new MyResponseHandler(MyContent.newInstance(), false, true);
+ }
+ }
+
+ private static class MyRequestHandler extends AbstractResource implements RequestHandler {
+
+ final MyContent content;
+ final Long timeout;
+ int numDenied;
+ int numEager;
+ Request request = null;
+ ResponseHandler responseHandler = null;
+ boolean destroyed = false;
+
+ MyRequestHandler(int numDenied, MyContent content, Long timeout, int numEager) {
+ this.numDenied = numDenied;
+ this.content = content;
+ this.timeout = timeout;
+ this.numEager = numEager;
+ }
+
+ @Override
+ public ContentChannel handleRequest(Request request, ResponseHandler handler) {
+ if (--numDenied >= 0) {
+ throw new RequestDeniedException(request);
+ }
+ this.request = request;
+ this.responseHandler = handler;
+ if (timeout != null) {
+ request.setTimeout(timeout, TimeUnit.MILLISECONDS);
+ }
+ if (--numEager >= 0) {
+ respond();
+ }
+ return content;
+ }
+
+ @Override
+ public void handleTimeout(Request request, ResponseHandler handler) {
+ Response.dispatchTimeout(handler);
+ }
+
+ @Override
+ protected void destroy() {
+ destroyed = true;
+ }
+
+ void respond() {
+ ContentChannel content = responseHandler.handleResponse(new Response(Response.Status.OK));
+ if (content != null) {
+ content.close(null);
+ }
+ }
+
+ static MyRequestHandler newInstance() {
+ return new MyRequestHandler(0, MyContent.newInstance(), null, 0);
+ }
+
+ static MyRequestHandler newTimeoutWithEagerResponse(long millis) {
+ return new MyRequestHandler(0, MyContent.newInstance(), millis, Integer.MAX_VALUE);
+ }
+
+ static MyRequestHandler newFirstRequestDenied() {
+ return new MyRequestHandler(1, MyContent.newInstance(), null, 0);
+ }
+
+ static MyRequestHandler newEagerResponse() {
+ return new MyRequestHandler(0, MyContent.newInstance(), null, Integer.MAX_VALUE);
+ }
+
+ public static MyRequestHandler newNullContent() {
+ return new MyRequestHandler(0, null, null, 0);
+ }
+ }
+
+ private static class MyException extends RuntimeException {
+
+ }
+}