// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.jdisc.handler; import com.google.common.util.concurrent.MoreExecutors; import org.junit.Test; import org.mockito.Mockito; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import static org.junit.Assert.fail; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertArrayEquals; /** * @author Simon Thoresen Hult */ public class FastContentWriterTestCase { @Test public void requireThatContentCanBeWritten() throws ExecutionException, InterruptedException { ReadableContentChannel content = new ReadableContentChannel(); FastContentWriter out = new FastContentWriter(content); ByteBuffer foo = ByteBuffer.allocate(69); out.write(foo); ByteBuffer bar = ByteBuffer.allocate(69); out.write(bar); out.close(); assertFalse(out.isDone()); assertSame(foo, content.read()); assertFalse(out.isDone()); assertSame(bar, content.read()); assertFalse(out.isDone()); assertNull(content.read()); assertTrue(out.isDone()); } @Test public void requireThatStringsAreUtf8Encoded() { ReadableContentChannel content = new ReadableContentChannel(); FastContentWriter out = new FastContentWriter(content); String in = "\u6211\u80FD\u541E\u4E0B\u73BB\u7483\u800C\u4E0D\u4F24\u8EAB\u4F53\u3002"; out.write(in); out.close(); ByteBuffer buf = content.read(); byte[] arr = new byte[buf.remaining()]; buf.get(arr); assertArrayEquals(in.getBytes(StandardCharsets.UTF_8), arr); } @Test public void requireThatCancelThrowsUnsupportedOperation() { try { new FastContentWriter(Mockito.mock(ContentChannel.class)).cancel(true); fail(); } catch (UnsupportedOperationException e) { } } @Test public void requireThatCancelIsAlwaysFalse() { FastContentWriter writer = new FastContentWriter(Mockito.mock(ContentChannel.class)); assertFalse(writer.isCancelled()); try { writer.cancel(true); fail(); } catch (UnsupportedOperationException e) { } assertFalse(writer.isCancelled()); } @Test public void requireThatGetThrowsTimeoutUntilCloseCompletionHandlerIsCalled() throws Exception { ReadableContentChannel buf = new ReadableContentChannel(); FastContentWriter out = new FastContentWriter(buf); out.write(new byte[] { 6, 9 }); assertFalse(out.isDone()); try { out.get(100, TimeUnit.MILLISECONDS); fail(); } catch (TimeoutException e) { } assertNotNull(buf.read()); assertFalse(out.isDone()); try { out.get(100, TimeUnit.MILLISECONDS); fail(); } catch (TimeoutException e) { } out.close(); assertFalse(out.isDone()); try { out.get(100, TimeUnit.MILLISECONDS); fail(); } catch (TimeoutException e) { } assertNull(buf.read()); assertTrue(out.isDone()); assertTrue(out.get(600, TimeUnit.SECONDS)); assertTrue(out.get()); } @Test public void requireThatSyncWriteExceptionFailsFuture() throws InterruptedException { IllegalStateException expected = new IllegalStateException(); ContentChannel content = Mockito.mock(ContentChannel.class); Mockito.doThrow(expected) .when(content).write(Mockito.any(ByteBuffer.class), Mockito.any(CompletionHandler.class)); FastContentWriter out = new FastContentWriter(content); try { out.write("foo"); fail(); } catch (Throwable t) { assertSame(expected, t); } try { out.get(); fail(); } catch (ExecutionException e) { assertSame(expected, e.getCause()); } } @Test public void requireThatSyncCloseExceptionFailsFuture() throws InterruptedException { IllegalStateException expected = new IllegalStateException(); ContentChannel content = Mockito.mock(ContentChannel.class); Mockito.doThrow(expected) .when(content).close(Mockito.any(CompletionHandler.class)); FastContentWriter out = new FastContentWriter(content); try { out.close(); fail(); } catch (Throwable t) { assertSame(expected, t); } try { out.get(); fail(); } catch (ExecutionException e) { assertSame(expected, e.getCause()); } } @Test public void requireThatAsyncExceptionFailsFuture() throws InterruptedException { IllegalStateException expected = new IllegalStateException(); ReadableContentChannel content = new ReadableContentChannel(); FastContentWriter out = new FastContentWriter(content); out.write("foo"); content.failed(expected); try { out.get(); fail(); } catch (ExecutionException e) { assertSame(expected, e.getCause()); } } @Test public void requireThatWriterCanBeListenedTo() throws InterruptedException { ReadableContentChannel buf = new ReadableContentChannel(); FastContentWriter out = new FastContentWriter(buf); RunnableLatch listener = new RunnableLatch(); out.addListener(listener, MoreExecutors.directExecutor()); out.write(new byte[] { 6, 9 }); assertFalse(listener.await(100, TimeUnit.MILLISECONDS)); assertNotNull(buf.read()); assertFalse(listener.await(100, TimeUnit.MILLISECONDS)); out.close(); assertFalse(listener.await(100, TimeUnit.MILLISECONDS)); assertNull(buf.read()); assertTrue(listener.await(600, TimeUnit.SECONDS)); } @Test public void requireThatWriterIsThreadSafe() throws Exception { final CountDownLatch latch = new CountDownLatch(2); final ReadableContentChannel content = new ReadableContentChannel(); Future read = Executors.newSingleThreadExecutor().submit(new Callable() { @Override public Integer call() throws Exception { latch.countDown(); latch.await(600, TimeUnit.SECONDS); int bufCnt = 0; while (content.read() != null) { ++bufCnt; } return bufCnt; } }); Future write = Executors.newSingleThreadExecutor().submit(new Callable() { @Override public Integer call() throws Exception { FastContentWriter out = new FastContentWriter(content); ByteBuffer buf = ByteBuffer.wrap(new byte[69]); int bufCnt = 4096 + new Random().nextInt(4096); latch.countDown(); latch.await(600, TimeUnit.SECONDS); for (int i = 0; i < bufCnt; ++i) { out.write(buf.slice()); } out.close(); return bufCnt; } }); assertEquals(read.get(600, TimeUnit.SECONDS), write.get(600, TimeUnit.SECONDS)); } }