From 21b36d7313ff9f02c93f85964ba682e8bc5dd148 Mon Sep 17 00:00:00 2001 From: Bjørn Christian Seime Date: Mon, 21 Sep 2020 16:11:30 +0200 Subject: Make ContainerThreadPool an interface --- .../threadpool/ContainerThreadPoolTest.java | 165 --------------------- .../threadpool/DefaultContainerThreadPoolTest.java | 165 +++++++++++++++++++++ 2 files changed, 165 insertions(+), 165 deletions(-) delete mode 100644 container-core/src/test/java/com/yahoo/container/handler/threadpool/ContainerThreadPoolTest.java create mode 100644 container-core/src/test/java/com/yahoo/container/handler/threadpool/DefaultContainerThreadPoolTest.java (limited to 'container-core/src/test/java/com/yahoo/container/handler') diff --git a/container-core/src/test/java/com/yahoo/container/handler/threadpool/ContainerThreadPoolTest.java b/container-core/src/test/java/com/yahoo/container/handler/threadpool/ContainerThreadPoolTest.java deleted file mode 100644 index 02e791099ed..00000000000 --- a/container-core/src/test/java/com/yahoo/container/handler/threadpool/ContainerThreadPoolTest.java +++ /dev/null @@ -1,165 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.container.handler.threadpool; - -import com.yahoo.collections.Tuple2; -import com.yahoo.concurrent.Receiver; -import com.yahoo.container.protect.ProcessTerminator; -import com.yahoo.jdisc.Metric; -import org.junit.Ignore; -import org.junit.Test; -import org.mockito.Mockito; - -import java.util.concurrent.Executor; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ThreadPoolExecutor; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -/** - * @author Steinar Knutsen - * @author bjorncs - */ -public class ContainerThreadPoolTest { - @Test - public final void testThreadPool() throws InterruptedException { - ContainerThreadpoolConfig config = new ContainerThreadpoolConfig(new ContainerThreadpoolConfig.Builder().maxThreads(1)); - ContainerThreadPool threadPool = new ContainerThreadPool(config, Mockito.mock(Metric.class)); - Executor exec = threadPool.executor(); - Tuple2 reply; - FlipIt command = new FlipIt(); - for (boolean done = false; !done;) { - try { - exec.execute(command); - done = true; - } catch (RejectedExecutionException e) { - // just try again - } - } - reply = command.didItRun.get(5 * 60 * 1000); - if (reply.first != Receiver.MessageState.VALID) { - fail("Executor task probably timed out, five minutes should be enough to flip a boolean."); - } - if (reply.second != Boolean.TRUE) { - fail("Executor task seemed to run, but did not get correct value."); - } - threadPool.close(); - command = new FlipIt(); - try { - exec.execute(command); - } catch (final RejectedExecutionException e) { - // this is what should happen - return; - } - fail("Pool did not reject tasks after shutdown."); - } - - private ThreadPoolExecutor createPool(int maxThreads, int queueSize) { - ContainerThreadpoolConfig config = new ContainerThreadpoolConfig(new ContainerThreadpoolConfig.Builder().maxThreads(maxThreads).queueSize(queueSize)); - ContainerThreadPool threadPool = new ContainerThreadPool(config, Mockito.mock(Metric.class)); - ExecutorServiceWrapper wrapper = (ExecutorServiceWrapper) threadPool.executor(); - WorkerCompletionTimingThreadPoolExecutor executor = (WorkerCompletionTimingThreadPoolExecutor)wrapper.delegate(); - return executor; - } - - @Test - public void testThatThreadPoolSizeFollowsConfig() { - ThreadPoolExecutor executor = createPool(3, 9); - assertEquals(3, executor.getMaximumPoolSize()); - assertEquals(9, executor.getQueue().remainingCapacity()); - } - @Test - public void testThatThreadPoolSizeAutoDetected() { - ThreadPoolExecutor executor = createPool(0, 0); - assertEquals(Runtime.getRuntime().availableProcessors()*4, executor.getMaximumPoolSize()); - assertEquals(0, executor.getQueue().remainingCapacity()); - } - @Test - public void testThatQueueSizeAutoDetected() { - ThreadPoolExecutor executor = createPool(3, -1); - assertEquals(3, executor.getMaximumPoolSize()); - assertEquals(executor.getMaximumPoolSize()*4, executor.getQueue().remainingCapacity()); - } - @Test - public void testThatThreadPoolSizeAndQueueSizeAutoDetected() { - ThreadPoolExecutor executor = createPool(0, -1); - assertEquals(Runtime.getRuntime().availableProcessors()*4, executor.getMaximumPoolSize()); - assertEquals(executor.getMaximumPoolSize()*4, executor.getQueue().remainingCapacity()); - } - - private class FlipIt implements Runnable { - public final Receiver didItRun = new Receiver<>(); - - @Override - public void run() { - didItRun.put(Boolean.TRUE); - } - } - - @Test - @Ignore // Ignored because it depends on the system time and so is unstable on factory - public void testThreadPoolTerminationOnBreakdown() throws InterruptedException { - ContainerThreadpoolConfig config = new ContainerThreadpoolConfig( - new ContainerThreadpoolConfig.Builder() - .maxThreads(2) - .maxThreadExecutionTimeSeconds(1)); - MockProcessTerminator terminator = new MockProcessTerminator(); - ContainerThreadPool threadPool = new ContainerThreadPool(config, Mockito.mock(Metric.class), terminator); - - // No dying when threads hang shorter than max thread execution time - threadPool.executor().execute(new Hang(500)); - threadPool.executor().execute(new Hang(500)); - assertEquals(0, terminator.dieRequests); - assertRejected(threadPool, new Hang(500)); // no more threads - assertEquals(0, terminator.dieRequests); // ... but not for long enough yet - try { Thread.sleep(1500); } catch (InterruptedException e) {} - threadPool.executor().execute(new Hang(1)); - assertEquals(0, terminator.dieRequests); - try { Thread.sleep(50); } catch (InterruptedException e) {} // Make sure both threads are available - - // Dying when hanging both thread pool threads for longer than max thread execution time - threadPool.executor().execute(new Hang(2000)); - threadPool.executor().execute(new Hang(2000)); - assertEquals(0, terminator.dieRequests); - assertRejected(threadPool, new Hang(2000)); // no more threads - assertEquals(0, terminator.dieRequests); // ... but not for long enough yet - try { Thread.sleep(1500); } catch (InterruptedException e) {} - assertRejected(threadPool, new Hang(2000)); // no more threads - assertEquals(1, terminator.dieRequests); // ... for longer than maxThreadExecutionTime - } - - private void assertRejected(ContainerThreadPool threadPool, Runnable task) { - try { - threadPool.executor().execute(task); - fail("Expected execution rejected"); - } catch (final RejectedExecutionException expected) { - } - } - - private class Hang implements Runnable { - - private final long hangMillis; - - public Hang(int hangMillis) { - this.hangMillis = hangMillis; - } - - @Override - public void run() { - try { Thread.sleep(hangMillis); } catch (InterruptedException e) {} - } - - } - - private static class MockProcessTerminator extends ProcessTerminator { - - public volatile int dieRequests = 0; - - @Override - public void logAndDie(String message, boolean dumpThreads) { - dieRequests++; - } - - } - -} \ No newline at end of file diff --git a/container-core/src/test/java/com/yahoo/container/handler/threadpool/DefaultContainerThreadPoolTest.java b/container-core/src/test/java/com/yahoo/container/handler/threadpool/DefaultContainerThreadPoolTest.java new file mode 100644 index 00000000000..8b1ed12c796 --- /dev/null +++ b/container-core/src/test/java/com/yahoo/container/handler/threadpool/DefaultContainerThreadPoolTest.java @@ -0,0 +1,165 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.container.handler.threadpool; + +import com.yahoo.collections.Tuple2; +import com.yahoo.concurrent.Receiver; +import com.yahoo.container.protect.ProcessTerminator; +import com.yahoo.jdisc.Metric; +import org.junit.Ignore; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.concurrent.Executor; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +/** + * @author Steinar Knutsen + * @author bjorncs + */ +public class DefaultContainerThreadPoolTest { + @Test + public final void testThreadPool() throws InterruptedException { + ContainerThreadpoolConfig config = new ContainerThreadpoolConfig(new ContainerThreadpoolConfig.Builder().maxThreads(1)); + ContainerThreadPool threadPool = new DefaultContainerThreadpool(config, Mockito.mock(Metric.class)); + Executor exec = threadPool.executor(); + Tuple2 reply; + FlipIt command = new FlipIt(); + for (boolean done = false; !done;) { + try { + exec.execute(command); + done = true; + } catch (RejectedExecutionException e) { + // just try again + } + } + reply = command.didItRun.get(5 * 60 * 1000); + if (reply.first != Receiver.MessageState.VALID) { + fail("Executor task probably timed out, five minutes should be enough to flip a boolean."); + } + if (reply.second != Boolean.TRUE) { + fail("Executor task seemed to run, but did not get correct value."); + } + threadPool.close(); + command = new FlipIt(); + try { + exec.execute(command); + } catch (final RejectedExecutionException e) { + // this is what should happen + return; + } + fail("Pool did not reject tasks after shutdown."); + } + + private ThreadPoolExecutor createPool(int maxThreads, int queueSize) { + ContainerThreadpoolConfig config = new ContainerThreadpoolConfig(new ContainerThreadpoolConfig.Builder().maxThreads(maxThreads).queueSize(queueSize)); + ContainerThreadPool threadPool = new DefaultContainerThreadpool(config, Mockito.mock(Metric.class)); + ExecutorServiceWrapper wrapper = (ExecutorServiceWrapper) threadPool.executor(); + WorkerCompletionTimingThreadPoolExecutor executor = (WorkerCompletionTimingThreadPoolExecutor)wrapper.delegate(); + return executor; + } + + @Test + public void testThatThreadPoolSizeFollowsConfig() { + ThreadPoolExecutor executor = createPool(3, 9); + assertEquals(3, executor.getMaximumPoolSize()); + assertEquals(9, executor.getQueue().remainingCapacity()); + } + @Test + public void testThatThreadPoolSizeAutoDetected() { + ThreadPoolExecutor executor = createPool(0, 0); + assertEquals(Runtime.getRuntime().availableProcessors()*4, executor.getMaximumPoolSize()); + assertEquals(0, executor.getQueue().remainingCapacity()); + } + @Test + public void testThatQueueSizeAutoDetected() { + ThreadPoolExecutor executor = createPool(3, -1); + assertEquals(3, executor.getMaximumPoolSize()); + assertEquals(executor.getMaximumPoolSize()*4, executor.getQueue().remainingCapacity()); + } + @Test + public void testThatThreadPoolSizeAndQueueSizeAutoDetected() { + ThreadPoolExecutor executor = createPool(0, -1); + assertEquals(Runtime.getRuntime().availableProcessors()*4, executor.getMaximumPoolSize()); + assertEquals(executor.getMaximumPoolSize()*4, executor.getQueue().remainingCapacity()); + } + + private class FlipIt implements Runnable { + public final Receiver didItRun = new Receiver<>(); + + @Override + public void run() { + didItRun.put(Boolean.TRUE); + } + } + + @Test + @Ignore // Ignored because it depends on the system time and so is unstable on factory + public void testThreadPoolTerminationOnBreakdown() throws InterruptedException { + ContainerThreadpoolConfig config = new ContainerThreadpoolConfig( + new ContainerThreadpoolConfig.Builder() + .maxThreads(2) + .maxThreadExecutionTimeSeconds(1)); + MockProcessTerminator terminator = new MockProcessTerminator(); + ContainerThreadPool threadPool = new DefaultContainerThreadpool(config, Mockito.mock(Metric.class), terminator); + + // No dying when threads hang shorter than max thread execution time + threadPool.executor().execute(new Hang(500)); + threadPool.executor().execute(new Hang(500)); + assertEquals(0, terminator.dieRequests); + assertRejected(threadPool, new Hang(500)); // no more threads + assertEquals(0, terminator.dieRequests); // ... but not for long enough yet + try { Thread.sleep(1500); } catch (InterruptedException e) {} + threadPool.executor().execute(new Hang(1)); + assertEquals(0, terminator.dieRequests); + try { Thread.sleep(50); } catch (InterruptedException e) {} // Make sure both threads are available + + // Dying when hanging both thread pool threads for longer than max thread execution time + threadPool.executor().execute(new Hang(2000)); + threadPool.executor().execute(new Hang(2000)); + assertEquals(0, terminator.dieRequests); + assertRejected(threadPool, new Hang(2000)); // no more threads + assertEquals(0, terminator.dieRequests); // ... but not for long enough yet + try { Thread.sleep(1500); } catch (InterruptedException e) {} + assertRejected(threadPool, new Hang(2000)); // no more threads + assertEquals(1, terminator.dieRequests); // ... for longer than maxThreadExecutionTime + } + + private void assertRejected(ContainerThreadPool threadPool, Runnable task) { + try { + threadPool.executor().execute(task); + fail("Expected execution rejected"); + } catch (final RejectedExecutionException expected) { + } + } + + private class Hang implements Runnable { + + private final long hangMillis; + + public Hang(int hangMillis) { + this.hangMillis = hangMillis; + } + + @Override + public void run() { + try { Thread.sleep(hangMillis); } catch (InterruptedException e) {} + } + + } + + private static class MockProcessTerminator extends ProcessTerminator { + + public volatile int dieRequests = 0; + + @Override + public void logAndDie(String message, boolean dumpThreads) { + dieRequests++; + } + + } + +} \ No newline at end of file -- cgit v1.2.3