From 18e9f06334c30ed4d73eeabaa0e6205e79c56c82 Mon Sep 17 00:00:00 2001 From: Bjørn Christian Seime Date: Wed, 3 Jun 2020 15:45:31 +0200 Subject: Split out thread pool implementation to separate classes --- .../container/handler/ThreadPoolProvider.java | 186 +-------------------- .../handler/threadpool/ContainerThreadPool.java | 87 ++++++++++ .../handler/threadpool/ExecutorServiceWrapper.java | 94 +++++++++++ .../WorkerCompletionTimingThreadPoolExecutor.java | 63 +++++++ .../container/handler/threadpool/package-info.java | 8 + .../handler/ThreadPoolProviderTestCase.java | 168 ------------------- .../threadpool/ContainerThreadPoolTest.java | 164 ++++++++++++++++++ 7 files changed, 422 insertions(+), 348 deletions(-) create mode 100644 container-core/src/main/java/com/yahoo/container/handler/threadpool/ContainerThreadPool.java create mode 100644 container-core/src/main/java/com/yahoo/container/handler/threadpool/ExecutorServiceWrapper.java create mode 100644 container-core/src/main/java/com/yahoo/container/handler/threadpool/WorkerCompletionTimingThreadPoolExecutor.java create mode 100644 container-core/src/main/java/com/yahoo/container/handler/threadpool/package-info.java delete mode 100644 container-core/src/test/java/com/yahoo/container/handler/ThreadPoolProviderTestCase.java create mode 100644 container-core/src/test/java/com/yahoo/container/handler/threadpool/ContainerThreadPoolTest.java diff --git a/container-core/src/main/java/com/yahoo/container/handler/ThreadPoolProvider.java b/container-core/src/main/java/com/yahoo/container/handler/ThreadPoolProvider.java index 0e786cfbc8f..958e958456c 100644 --- a/container-core/src/main/java/com/yahoo/container/handler/ThreadPoolProvider.java +++ b/container-core/src/main/java/com/yahoo/container/handler/ThreadPoolProvider.java @@ -1,26 +1,13 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.container.handler; -import com.google.common.util.concurrent.ForwardingExecutorService; -import com.google.inject.Inject; import com.yahoo.component.AbstractComponent; -import com.yahoo.concurrent.ThreadFactoryFactory; import com.yahoo.container.di.componentgraph.Provider; +import com.yahoo.container.handler.threadpool.ContainerThreadPool; import com.yahoo.container.protect.ProcessTerminator; import com.yahoo.jdisc.Metric; -import java.util.Map; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; /** * A configurable thread pool provider. This provides the worker threads used for normal request processing. @@ -32,40 +19,14 @@ import java.util.concurrent.atomic.AtomicLong; */ public class ThreadPoolProvider extends AbstractComponent implements Provider { - private final ExecutorServiceWrapper threadpool; + private final ContainerThreadPool threadpool; - private static BlockingQueue createQ(int queueSize, int maxThreads) { - return (queueSize == 0) - ? new SynchronousQueue<>(false) - : (queueSize < 0) - ? new ArrayBlockingQueue<>(maxThreads*4) - : new ArrayBlockingQueue<>(queueSize); - } - - private static int computeThreadPoolSize(int maxNumThreads) { - return (maxNumThreads <= 0) - ? Runtime.getRuntime().availableProcessors() * 4 - : maxNumThreads; - } - @Inject public ThreadPoolProvider(ThreadpoolConfig threadpoolConfig, Metric metric) { - this(threadpoolConfig, metric, new ProcessTerminator()); + this.threadpool = new ContainerThreadPool(threadpoolConfig, metric); } public ThreadPoolProvider(ThreadpoolConfig threadpoolConfig, Metric metric, ProcessTerminator processTerminator) { - int maxNumThreads = computeThreadPoolSize(threadpoolConfig.maxthreads()); - WorkerCompletionTimingThreadPoolExecutor executor = - new WorkerCompletionTimingThreadPoolExecutor(maxNumThreads, maxNumThreads, - 0L, TimeUnit.SECONDS, - createQ(threadpoolConfig.queueSize(), maxNumThreads), - ThreadFactoryFactory.getThreadFactory("threadpool"), - metric); - // Prestart needed, if not all threads will be created by the fist N tasks and hence they might also - // get the dreaded thread locals initialized even if they will never run. - // That counters what we we want to achieve with the Q that will prefer thread locality. - executor.prestartAllCoreThreads(); - threadpool = new ExecutorServiceWrapper(executor, metric, processTerminator, - threadpoolConfig.maxThreadExecutionTimeSeconds() * 1000L); + this.threadpool = new ContainerThreadPool(threadpoolConfig, metric, processTerminator); } /** @@ -75,7 +36,7 @@ public class ThreadPoolProvider extends AbstractComponent implements Provider maxThreadExecutionTimeMillis) - processTerminator.logAndDie("No worker threads have been available for " + - timeSinceLastReturnedThreadMillis + " ms. Shutting down.", true); - throw e; - } - } - - @Override - protected ExecutorService delegate() { return wrapped; } - - private static final class MetricNames { - private static final String REJECTED_REQUEST = "serverRejectedRequests"; - private static final String THREAD_POOL_SIZE = "serverThreadPoolSize"; - private static final String ACTIVE_THREADS = "serverActiveThreads"; - } - - } - - /** - * A thread pool executor which maintains the last time a worker completed - * package private for testing - **/ - final static class WorkerCompletionTimingThreadPoolExecutor extends ThreadPoolExecutor { - - private static final String UNHANDLED_EXCEPTIONS_METRIC = "jdisc.thread_pool.unhandled_exceptions"; - - volatile long lastThreadAssignmentTimeMillis = System.currentTimeMillis(); - private final AtomicLong startedCount = new AtomicLong(0); - private final AtomicLong completedCount = new AtomicLong(0); - private final Metric metric; - - public WorkerCompletionTimingThreadPoolExecutor(int corePoolSize, - int maximumPoolSize, - long keepAliveTime, - TimeUnit unit, - BlockingQueue workQueue, - ThreadFactory threadFactory, - Metric metric) { - super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); - this.metric = metric; - } - - @Override - protected void beforeExecute(Thread t, Runnable r) { - super.beforeExecute(t, r); - lastThreadAssignmentTimeMillis = System.currentTimeMillis(); - startedCount.incrementAndGet(); - } - - @Override - protected void afterExecute(Runnable r, Throwable t) { - super.afterExecute(r, t); - completedCount.incrementAndGet(); - if (t != null) { - metric.add(UNHANDLED_EXCEPTIONS_METRIC, 1L, metric.createContext(Map.of("exception", t.getClass().getSimpleName()))); - } - } - - @Override - public int getActiveCount() { - return (int)(startedCount.get() - completedCount.get()); - } + threadpool.deconstruct(); } } diff --git a/container-core/src/main/java/com/yahoo/container/handler/threadpool/ContainerThreadPool.java b/container-core/src/main/java/com/yahoo/container/handler/threadpool/ContainerThreadPool.java new file mode 100644 index 00000000000..0f3be65f85f --- /dev/null +++ b/container-core/src/main/java/com/yahoo/container/handler/threadpool/ContainerThreadPool.java @@ -0,0 +1,87 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.container.handler.threadpool; + +import com.google.inject.Inject; +import com.yahoo.component.AbstractComponent; +import com.yahoo.concurrent.ThreadFactoryFactory; +import com.yahoo.container.handler.ThreadpoolConfig; +import com.yahoo.container.protect.ProcessTerminator; +import com.yahoo.jdisc.Metric; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; + +/** + * A configurable thread pool. This provides the worker threads used for normal request processing. + * + * @author Steinar Knutsen + * @author baldersheim + * @author bratseth + * @author bjorncs + */ +public class ContainerThreadPool extends AbstractComponent implements AutoCloseable { + + private final ExecutorServiceWrapper threadpool; + + @Inject + public ContainerThreadPool(ThreadpoolConfig config, Metric metric) { + this(config, metric, new ProcessTerminator()); + } + + public ContainerThreadPool(ThreadpoolConfig threadpoolConfig, Metric metric, ProcessTerminator processTerminator) { + int maxNumThreads = computeThreadPoolSize(threadpoolConfig.maxthreads()); + WorkerCompletionTimingThreadPoolExecutor executor = + new WorkerCompletionTimingThreadPoolExecutor(maxNumThreads, maxNumThreads, + 0L, TimeUnit.SECONDS, + createQ(threadpoolConfig.queueSize(), maxNumThreads), + ThreadFactoryFactory.getThreadFactory("threadpool"), + metric); + // Prestart needed, if not all threads will be created by the fist N tasks and hence they might also + // get the dreaded thread locals initialized even if they will never run. + // That counters what we we want to achieve with the Q that will prefer thread locality. + executor.prestartAllCoreThreads(); + threadpool = new ExecutorServiceWrapper(executor, metric, processTerminator, + threadpoolConfig.maxThreadExecutionTimeSeconds() * 1000L); + } + + public Executor executor() { return threadpool; } + @Override public void deconstruct() { closeInternal(); } + @Override public void close() { closeInternal(); } + + /** + * Shutdown the thread pool, give a grace period of 1 second before forcibly + * shutting down all worker threads. + */ + private void closeInternal() { + boolean terminated; + + super.deconstruct(); + threadpool.shutdown(); + try { + terminated = threadpool.awaitTermination(1, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + if (!terminated) { + threadpool.shutdownNow(); + } + } + + private static BlockingQueue createQ(int queueSize, int maxThreads) { + return (queueSize == 0) + ? new SynchronousQueue<>(false) + : (queueSize < 0) + ? new ArrayBlockingQueue<>(maxThreads*4) + : new ArrayBlockingQueue<>(queueSize); + } + + private static int computeThreadPoolSize(int maxNumThreads) { + return (maxNumThreads <= 0) + ? Runtime.getRuntime().availableProcessors() * 4 + : maxNumThreads; + } +} diff --git a/container-core/src/main/java/com/yahoo/container/handler/threadpool/ExecutorServiceWrapper.java b/container-core/src/main/java/com/yahoo/container/handler/threadpool/ExecutorServiceWrapper.java new file mode 100644 index 00000000000..f7b0a22120a --- /dev/null +++ b/container-core/src/main/java/com/yahoo/container/handler/threadpool/ExecutorServiceWrapper.java @@ -0,0 +1,94 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.container.handler.threadpool; + +import com.google.common.util.concurrent.ForwardingExecutorService; +import com.yahoo.container.protect.ProcessTerminator; +import com.yahoo.jdisc.Metric; + +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * A service executor wrapper which emits metrics and + * shuts down the vm when no workers are available for too long to avoid containers lingering in a blocked state. + * Package private for testing + * + * @author Steinar Knutsen + * @author baldersheim + * @author bratseth + */ +class ExecutorServiceWrapper extends ForwardingExecutorService { + + private final WorkerCompletionTimingThreadPoolExecutor wrapped; + private final Metric metric; + private final ProcessTerminator processTerminator; + private final long maxThreadExecutionTimeMillis; + private final Thread metricReporter; + private final AtomicBoolean closed = new AtomicBoolean(false); + + ExecutorServiceWrapper( + WorkerCompletionTimingThreadPoolExecutor wrapped, + Metric metric, ProcessTerminator processTerminator, + long maxThreadExecutionTimeMillis) { + this.wrapped = wrapped; + this.metric = metric; + this.processTerminator = processTerminator; + this.maxThreadExecutionTimeMillis = maxThreadExecutionTimeMillis; + + metric.set(MetricNames.THREAD_POOL_SIZE, wrapped.getPoolSize(), null); + metric.set(MetricNames.ACTIVE_THREADS, wrapped.getActiveCount(), null); + metric.add(MetricNames.REJECTED_REQUEST, 0, null); + metricReporter = new Thread(this::reportMetrics); + metricReporter.setDaemon(true); + metricReporter.start(); + } + + private final void reportMetrics() { + try { + while (!closed.get()) { + metric.set(MetricNames.THREAD_POOL_SIZE, wrapped.getPoolSize(), null); + metric.set(MetricNames.ACTIVE_THREADS, wrapped.getActiveCount(), null); + Thread.sleep(100); + } + } catch (InterruptedException e) { } + } + + @Override + public void shutdown() { + super.shutdown(); + closed.set(true); + } + + /** + * Tracks all instances of {@link RejectedExecutionException}. + * {@link ContainerThreadPool} returns an executor, so external uses will not + * have access to the methods declared by {@link ExecutorService}. + * ({@link Executor#execute(Runnable)} is declared by {@link Executor}.) + */ + @Override + public void execute(Runnable command) { + try { + super.execute(command); + } catch (RejectedExecutionException e) { + metric.add(MetricNames.REJECTED_REQUEST, 1, null); + long timeSinceLastReturnedThreadMillis = System.currentTimeMillis() - wrapped.lastThreadAssignmentTimeMillis; + if (timeSinceLastReturnedThreadMillis > maxThreadExecutionTimeMillis) + processTerminator.logAndDie("No worker threads have been available for " + + timeSinceLastReturnedThreadMillis + " ms. Shutting down.", true); + throw e; + } + } + + @Override + protected ExecutorService delegate() { return wrapped; } + + private static final class MetricNames { + private static final String REJECTED_REQUEST = "serverRejectedRequests"; + private static final String THREAD_POOL_SIZE = "serverThreadPoolSize"; + private static final String ACTIVE_THREADS = "serverActiveThreads"; + } + +} + diff --git a/container-core/src/main/java/com/yahoo/container/handler/threadpool/WorkerCompletionTimingThreadPoolExecutor.java b/container-core/src/main/java/com/yahoo/container/handler/threadpool/WorkerCompletionTimingThreadPoolExecutor.java new file mode 100644 index 00000000000..9742e7ecfc3 --- /dev/null +++ b/container-core/src/main/java/com/yahoo/container/handler/threadpool/WorkerCompletionTimingThreadPoolExecutor.java @@ -0,0 +1,63 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.container.handler.threadpool; + +import com.yahoo.jdisc.Metric; + +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +/** + * A thread pool executor which maintains the last time a worker completed + * package private for testing + * + * @author Steinar Knutsen + * @author baldersheim + * @author bratseth + */ +class WorkerCompletionTimingThreadPoolExecutor extends ThreadPoolExecutor { + + private static final String UNHANDLED_EXCEPTIONS_METRIC = "jdisc.thread_pool.unhandled_exceptions"; + + volatile long lastThreadAssignmentTimeMillis = System.currentTimeMillis(); + private final AtomicLong startedCount = new AtomicLong(0); + private final AtomicLong completedCount = new AtomicLong(0); + private final Metric metric; + + WorkerCompletionTimingThreadPoolExecutor( + int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + BlockingQueue workQueue, + ThreadFactory threadFactory, + Metric metric) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); + this.metric = metric; + } + + @Override + protected void beforeExecute(Thread t, Runnable r) { + super.beforeExecute(t, r); + lastThreadAssignmentTimeMillis = System.currentTimeMillis(); + startedCount.incrementAndGet(); + } + + @Override + protected void afterExecute(Runnable r, Throwable t) { + super.afterExecute(r, t); + completedCount.incrementAndGet(); + if (t != null) { + metric.add(UNHANDLED_EXCEPTIONS_METRIC, 1L, metric.createContext(Map.of("exception", t.getClass().getSimpleName()))); + } + } + + @Override + public int getActiveCount() { + return (int)(startedCount.get() - completedCount.get()); + } +} + diff --git a/container-core/src/main/java/com/yahoo/container/handler/threadpool/package-info.java b/container-core/src/main/java/com/yahoo/container/handler/threadpool/package-info.java new file mode 100644 index 00000000000..6a94cea49da --- /dev/null +++ b/container-core/src/main/java/com/yahoo/container/handler/threadpool/package-info.java @@ -0,0 +1,8 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * @author bjorncs + */ +@ExportPackage +package com.yahoo.container.handler.threadpool; + +import com.yahoo.osgi.annotation.ExportPackage; \ No newline at end of file diff --git a/container-core/src/test/java/com/yahoo/container/handler/ThreadPoolProviderTestCase.java b/container-core/src/test/java/com/yahoo/container/handler/ThreadPoolProviderTestCase.java deleted file mode 100644 index 761ed40763c..00000000000 --- a/container-core/src/test/java/com/yahoo/container/handler/ThreadPoolProviderTestCase.java +++ /dev/null @@ -1,168 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.container.handler; - -import static org.junit.Assert.fail; - -import java.util.concurrent.Executor; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ThreadPoolExecutor; - -import com.yahoo.container.protect.ProcessTerminator; -import org.junit.Ignore; -import org.junit.Test; -import org.mockito.Mockito; - -import com.yahoo.concurrent.Receiver; -import com.yahoo.concurrent.Receiver.MessageState; -import com.yahoo.collections.Tuple2; -import com.yahoo.jdisc.Metric; - -import static org.junit.Assert.assertEquals; - -/** - * Check threadpool provider accepts tasks and shuts down properly. - * - * @author Steinar Knutsen - */ -public class ThreadPoolProviderTestCase { - - @Test - public final void testThreadPoolProvider() throws InterruptedException { - ThreadpoolConfig config = new ThreadpoolConfig(new ThreadpoolConfig.Builder().maxthreads(1)); - ThreadPoolProvider provider = new ThreadPoolProvider(config, Mockito.mock(Metric.class)); - Executor exec = provider.get(); - Tuple2 reply; - FlipIt command = new FlipIt(); - for (boolean done = false; !done;) { - try { - exec.execute(command); - done = true; - } catch (RejectedExecutionException e) { - // just try again - } - } - reply = command.didItRun.get(5 * 60 * 1000); - if (reply.first != MessageState.VALID) { - fail("Executor task probably timed out, five minutes should be enough to flip a boolean."); - } - if (reply.second != Boolean.TRUE) { - fail("Executor task seemed to run, but did not get correct value."); - } - provider.deconstruct(); - command = new FlipIt(); - try { - exec.execute(command); - } catch (final RejectedExecutionException e) { - // this is what should happen - return; - } - fail("Pool did not reject tasks after shutdown."); - } - - private ThreadPoolExecutor createPool(int maxThreads, int queueSize) { - ThreadpoolConfig config = new ThreadpoolConfig(new ThreadpoolConfig.Builder().maxthreads(maxThreads).queueSize(queueSize)); - ThreadPoolProvider provider = new ThreadPoolProvider(config, Mockito.mock(Metric.class)); - ThreadPoolProvider.ExecutorServiceWrapper wrapper = (ThreadPoolProvider.ExecutorServiceWrapper) provider.get(); - ThreadPoolProvider.WorkerCompletionTimingThreadPoolExecutor executor = (ThreadPoolProvider.WorkerCompletionTimingThreadPoolExecutor)wrapper.delegate(); - return executor; - } - - @Test - public void testThatThreadPoolSizeFollowsConfig() { - ThreadPoolExecutor executor = createPool(3, 9); - assertEquals(3, executor.getMaximumPoolSize()); - assertEquals(9, executor.getQueue().remainingCapacity()); - } - @Test - public void testThatThreadPoolSizeAutoDetected() { - ThreadPoolExecutor executor = createPool(0, 0); - assertEquals(Runtime.getRuntime().availableProcessors()*4, executor.getMaximumPoolSize()); - assertEquals(0, executor.getQueue().remainingCapacity()); - } - @Test - public void testThatQueueSizeAutoDetected() { - ThreadPoolExecutor executor = createPool(3, -1); - assertEquals(3, executor.getMaximumPoolSize()); - assertEquals(executor.getMaximumPoolSize()*4, executor.getQueue().remainingCapacity()); - } - @Test - public void testThatThreadPoolSizeAndQueueSizeAutoDetected() { - ThreadPoolExecutor executor = createPool(0, -1); - assertEquals(Runtime.getRuntime().availableProcessors()*4, executor.getMaximumPoolSize()); - assertEquals(executor.getMaximumPoolSize()*4, executor.getQueue().remainingCapacity()); - } - - private class FlipIt implements Runnable { - public final Receiver didItRun = new Receiver<>(); - - @Override - public void run() { - didItRun.put(Boolean.TRUE); - } - } - - @Test - @Ignore // Ignored because it depends on the system time and so is unstable on factory - public void testThreadPoolProviderTerminationOnBreakdown() throws InterruptedException { - ThreadpoolConfig config = new ThreadpoolConfig(new ThreadpoolConfig.Builder().maxthreads(2) - .maxThreadExecutionTimeSeconds(1)); - MockProcessTerminator terminator = new MockProcessTerminator(); - ThreadPoolProvider provider = new ThreadPoolProvider(config, Mockito.mock(Metric.class), terminator); - - // No dying when threads hang shorter than max thread execution time - provider.get().execute(new Hang(500)); - provider.get().execute(new Hang(500)); - assertEquals(0, terminator.dieRequests); - assertRejected(provider, new Hang(500)); // no more threads - assertEquals(0, terminator.dieRequests); // ... but not for long enough yet - try { Thread.sleep(1500); } catch (InterruptedException e) {} - provider.get().execute(new Hang(1)); - assertEquals(0, terminator.dieRequests); - try { Thread.sleep(50); } catch (InterruptedException e) {} // Make sure both threads are available - - // Dying when hanging both thread pool threads for longer than max thread execution time - provider.get().execute(new Hang(2000)); - provider.get().execute(new Hang(2000)); - assertEquals(0, terminator.dieRequests); - assertRejected(provider, new Hang(2000)); // no more threads - assertEquals(0, terminator.dieRequests); // ... but not for long enough yet - try { Thread.sleep(1500); } catch (InterruptedException e) {} - assertRejected(provider, new Hang(2000)); // no more threads - assertEquals(1, terminator.dieRequests); // ... for longer than maxThreadExecutionTime - } - - private void assertRejected(ThreadPoolProvider provider, Runnable task) { - try { - provider.get().execute(task); - fail("Expected execution rejected"); - } catch (final RejectedExecutionException expected) { - } - } - - private class Hang implements Runnable { - - private final long hangMillis; - - public Hang(int hangMillis) { - this.hangMillis = hangMillis; - } - - @Override - public void run() { - try { Thread.sleep(hangMillis); } catch (InterruptedException e) {} - } - - } - - private static class MockProcessTerminator extends ProcessTerminator { - - public volatile int dieRequests = 0; - - @Override - public void logAndDie(String message, boolean dumpThreads) { - dieRequests++; - } - - } - -} diff --git a/container-core/src/test/java/com/yahoo/container/handler/threadpool/ContainerThreadPoolTest.java b/container-core/src/test/java/com/yahoo/container/handler/threadpool/ContainerThreadPoolTest.java new file mode 100644 index 00000000000..7998bbc4872 --- /dev/null +++ b/container-core/src/test/java/com/yahoo/container/handler/threadpool/ContainerThreadPoolTest.java @@ -0,0 +1,164 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.container.handler.threadpool; + +import com.yahoo.collections.Tuple2; +import com.yahoo.concurrent.Receiver; +import com.yahoo.container.handler.ThreadpoolConfig; +import com.yahoo.container.protect.ProcessTerminator; +import com.yahoo.jdisc.Metric; +import org.junit.Ignore; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.concurrent.Executor; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +/** + * @author Steinar Knutsen + * @author bjorncs + */ +public class ContainerThreadPoolTest { + @Test + public final void testThreadPool() throws InterruptedException { + ThreadpoolConfig config = new ThreadpoolConfig(new ThreadpoolConfig.Builder().maxthreads(1)); + ContainerThreadPool threadPool = new ContainerThreadPool(config, Mockito.mock(Metric.class)); + Executor exec = threadPool.executor(); + Tuple2 reply; + FlipIt command = new FlipIt(); + for (boolean done = false; !done;) { + try { + exec.execute(command); + done = true; + } catch (RejectedExecutionException e) { + // just try again + } + } + reply = command.didItRun.get(5 * 60 * 1000); + if (reply.first != Receiver.MessageState.VALID) { + fail("Executor task probably timed out, five minutes should be enough to flip a boolean."); + } + if (reply.second != Boolean.TRUE) { + fail("Executor task seemed to run, but did not get correct value."); + } + threadPool.deconstruct(); + command = new FlipIt(); + try { + exec.execute(command); + } catch (final RejectedExecutionException e) { + // this is what should happen + return; + } + fail("Pool did not reject tasks after shutdown."); + } + + private ThreadPoolExecutor createPool(int maxThreads, int queueSize) { + ThreadpoolConfig config = new ThreadpoolConfig(new ThreadpoolConfig.Builder().maxthreads(maxThreads).queueSize(queueSize)); + ContainerThreadPool threadPool = new ContainerThreadPool(config, Mockito.mock(Metric.class)); + ExecutorServiceWrapper wrapper = (ExecutorServiceWrapper) threadPool.executor(); + WorkerCompletionTimingThreadPoolExecutor executor = (WorkerCompletionTimingThreadPoolExecutor)wrapper.delegate(); + return executor; + } + + @Test + public void testThatThreadPoolSizeFollowsConfig() { + ThreadPoolExecutor executor = createPool(3, 9); + assertEquals(3, executor.getMaximumPoolSize()); + assertEquals(9, executor.getQueue().remainingCapacity()); + } + @Test + public void testThatThreadPoolSizeAutoDetected() { + ThreadPoolExecutor executor = createPool(0, 0); + assertEquals(Runtime.getRuntime().availableProcessors()*4, executor.getMaximumPoolSize()); + assertEquals(0, executor.getQueue().remainingCapacity()); + } + @Test + public void testThatQueueSizeAutoDetected() { + ThreadPoolExecutor executor = createPool(3, -1); + assertEquals(3, executor.getMaximumPoolSize()); + assertEquals(executor.getMaximumPoolSize()*4, executor.getQueue().remainingCapacity()); + } + @Test + public void testThatThreadPoolSizeAndQueueSizeAutoDetected() { + ThreadPoolExecutor executor = createPool(0, -1); + assertEquals(Runtime.getRuntime().availableProcessors()*4, executor.getMaximumPoolSize()); + assertEquals(executor.getMaximumPoolSize()*4, executor.getQueue().remainingCapacity()); + } + + private class FlipIt implements Runnable { + public final Receiver didItRun = new Receiver<>(); + + @Override + public void run() { + didItRun.put(Boolean.TRUE); + } + } + + @Test + @Ignore // Ignored because it depends on the system time and so is unstable on factory + public void testThreadPoolTerminationOnBreakdown() throws InterruptedException { + ThreadpoolConfig config = new ThreadpoolConfig(new ThreadpoolConfig.Builder().maxthreads(2) + .maxThreadExecutionTimeSeconds(1)); + MockProcessTerminator terminator = new MockProcessTerminator(); + ContainerThreadPool threadPool = new ContainerThreadPool(config, Mockito.mock(Metric.class), terminator); + + // No dying when threads hang shorter than max thread execution time + threadPool.executor().execute(new Hang(500)); + threadPool.executor().execute(new Hang(500)); + assertEquals(0, terminator.dieRequests); + assertRejected(threadPool, new Hang(500)); // no more threads + assertEquals(0, terminator.dieRequests); // ... but not for long enough yet + try { Thread.sleep(1500); } catch (InterruptedException e) {} + threadPool.executor().execute(new Hang(1)); + assertEquals(0, terminator.dieRequests); + try { Thread.sleep(50); } catch (InterruptedException e) {} // Make sure both threads are available + + // Dying when hanging both thread pool threads for longer than max thread execution time + threadPool.executor().execute(new Hang(2000)); + threadPool.executor().execute(new Hang(2000)); + assertEquals(0, terminator.dieRequests); + assertRejected(threadPool, new Hang(2000)); // no more threads + assertEquals(0, terminator.dieRequests); // ... but not for long enough yet + try { Thread.sleep(1500); } catch (InterruptedException e) {} + assertRejected(threadPool, new Hang(2000)); // no more threads + assertEquals(1, terminator.dieRequests); // ... for longer than maxThreadExecutionTime + } + + private void assertRejected(ContainerThreadPool threadPool, Runnable task) { + try { + threadPool.executor().execute(task); + fail("Expected execution rejected"); + } catch (final RejectedExecutionException expected) { + } + } + + private class Hang implements Runnable { + + private final long hangMillis; + + public Hang(int hangMillis) { + this.hangMillis = hangMillis; + } + + @Override + public void run() { + try { Thread.sleep(hangMillis); } catch (InterruptedException e) {} + } + + } + + private static class MockProcessTerminator extends ProcessTerminator { + + public volatile int dieRequests = 0; + + @Override + public void logAndDie(String message, boolean dumpThreads) { + dieRequests++; + } + + } + +} \ No newline at end of file -- cgit v1.2.3