diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-04-25 18:57:12 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-04-25 18:57:12 +0000 |
commit | c7b890535a3009e52e7227b883f2c9a16e59a740 (patch) | |
tree | bb68bdc548c6715a495e5aa910ea8e45ff1469ad /container-core | |
parent | 35decca41db6b9a44b24b6f7501c84d159ebd6a7 (diff) |
Let size of thread pool and Q follow number of cores on the machine where it is running.
If auto detected num worker threads will use number of #cores * 4, quesize will use #numWorkers * 4.
No changes of the default value in this commit.
Diffstat (limited to 'container-core')
4 files changed, 68 insertions, 6 deletions
diff --git a/container-core/abi-spec.json b/container-core/abi-spec.json index 6d683c53984..82d34d88b99 100644 --- a/container-core/abi-spec.json +++ b/container-core/abi-spec.json @@ -171,6 +171,7 @@ "public void <init>()", "public void <init>(com.yahoo.container.handler.ThreadpoolConfig)", "public com.yahoo.container.handler.ThreadpoolConfig$Builder maxthreads(int)", + "public com.yahoo.container.handler.ThreadpoolConfig$Builder queueSize(int)", "public com.yahoo.container.handler.ThreadpoolConfig$Builder maxThreadExecutionTimeSeconds(int)", "public com.yahoo.container.handler.ThreadpoolConfig$Builder softStartSeconds(double)", "public final boolean dispatchGetConfig(com.yahoo.config.ConfigInstance$Producer)", @@ -210,6 +211,7 @@ "public static java.lang.String getDefVersion()", "public void <init>(com.yahoo.container.handler.ThreadpoolConfig$Builder)", "public int maxthreads()", + "public int queueSize()", "public int maxThreadExecutionTimeSeconds()", "public double softStartSeconds()" ], diff --git a/container-core/src/main/java/com/yahoo/container/handler/ThreadPoolProvider.java b/container-core/src/main/java/com/yahoo/container/handler/ThreadPoolProvider.java index b427a58c9b7..0e786cfbc8f 100644 --- a/container-core/src/main/java/com/yahoo/container/handler/ThreadPoolProvider.java +++ b/container-core/src/main/java/com/yahoo/container/handler/ThreadPoolProvider.java @@ -10,6 +10,7 @@ import com.yahoo.container.protect.ProcessTerminator; import com.yahoo.jdisc.Metric; import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; @@ -33,17 +34,30 @@ public class ThreadPoolProvider extends AbstractComponent implements Provider<Ex private final ExecutorServiceWrapper threadpool; + private static BlockingQueue<Runnable> createQ(int queueSize, int maxThreads) { + return (queueSize == 0) + ? new SynchronousQueue<>(false) + : (queueSize < 0) + ? new ArrayBlockingQueue<>(maxThreads*4) + : new ArrayBlockingQueue<>(queueSize); + } + + private static int computeThreadPoolSize(int maxNumThreads) { + return (maxNumThreads <= 0) + ? Runtime.getRuntime().availableProcessors() * 4 + : maxNumThreads; + } @Inject public ThreadPoolProvider(ThreadpoolConfig threadpoolConfig, Metric metric) { this(threadpoolConfig, metric, new ProcessTerminator()); } public ThreadPoolProvider(ThreadpoolConfig threadpoolConfig, Metric metric, ProcessTerminator processTerminator) { + int maxNumThreads = computeThreadPoolSize(threadpoolConfig.maxthreads()); WorkerCompletionTimingThreadPoolExecutor executor = - new WorkerCompletionTimingThreadPoolExecutor(threadpoolConfig.maxthreads(), - threadpoolConfig.maxthreads(), + new WorkerCompletionTimingThreadPoolExecutor(maxNumThreads, maxNumThreads, 0L, TimeUnit.SECONDS, - new SynchronousQueue<>(false), + createQ(threadpoolConfig.queueSize(), maxNumThreads), ThreadFactoryFactory.getThreadFactory("threadpool"), metric); // Prestart needed, if not all threads will be created by the fist N tasks and hence they might also @@ -87,8 +101,9 @@ public class ThreadPoolProvider extends AbstractComponent implements Provider<Ex /** * A service executor wrapper which emits metrics and * shuts down the vm when no workers are available for too long to avoid containers lingering in a blocked state. + * Package private for testing */ - private final static class ExecutorServiceWrapper extends ForwardingExecutorService { + final static class ExecutorServiceWrapper extends ForwardingExecutorService { private final WorkerCompletionTimingThreadPoolExecutor wrapped; private final Metric metric; @@ -160,8 +175,11 @@ public class ThreadPoolProvider extends AbstractComponent implements Provider<Ex } - /** A thread pool executor which maintains the last time a worker completed */ - private final static class WorkerCompletionTimingThreadPoolExecutor extends ThreadPoolExecutor { + /** + * A thread pool executor which maintains the last time a worker completed + * package private for testing + **/ + final static class WorkerCompletionTimingThreadPoolExecutor extends ThreadPoolExecutor { private static final String UNHANDLED_EXCEPTIONS_METRIC = "jdisc.thread_pool.unhandled_exceptions"; diff --git a/container-core/src/main/resources/configdefinitions/threadpool.def b/container-core/src/main/resources/configdefinitions/threadpool.def index 9bb9badd9b5..abc60f9f06d 100644 --- a/container-core/src/main/resources/configdefinitions/threadpool.def +++ b/container-core/src/main/resources/configdefinitions/threadpool.def @@ -2,8 +2,16 @@ namespace=container.handler +## Num ber of thread in the thread pool +## Setting it to 0 or negative number will cause it to be set to #cores * 4 maxthreads int default=500 +## max queue size +## There can be queueSize + maxthreads requests inflight concurrently +## The container will start replying 503 +## Negative value will cause it to set to maxthreads*4 +queueSize int default=0 + # The max time the container tolerates having no threads available before it shuts down to # get out of a bad state. This should be set a bit higher than the expected max execution # time of each request when in a state of overload, i.e about "worst case execution time*2" diff --git a/container-core/src/test/java/com/yahoo/container/handler/ThreadPoolProviderTestCase.java b/container-core/src/test/java/com/yahoo/container/handler/ThreadPoolProviderTestCase.java index 918863f6dda..761ed40763c 100644 --- a/container-core/src/test/java/com/yahoo/container/handler/ThreadPoolProviderTestCase.java +++ b/container-core/src/test/java/com/yahoo/container/handler/ThreadPoolProviderTestCase.java @@ -5,6 +5,7 @@ import static org.junit.Assert.fail; import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; import com.yahoo.container.protect.ProcessTerminator; import org.junit.Ignore; @@ -58,6 +59,39 @@ public class ThreadPoolProviderTestCase { fail("Pool did not reject tasks after shutdown."); } + private ThreadPoolExecutor createPool(int maxThreads, int queueSize) { + ThreadpoolConfig config = new ThreadpoolConfig(new ThreadpoolConfig.Builder().maxthreads(maxThreads).queueSize(queueSize)); + ThreadPoolProvider provider = new ThreadPoolProvider(config, Mockito.mock(Metric.class)); + ThreadPoolProvider.ExecutorServiceWrapper wrapper = (ThreadPoolProvider.ExecutorServiceWrapper) provider.get(); + ThreadPoolProvider.WorkerCompletionTimingThreadPoolExecutor executor = (ThreadPoolProvider.WorkerCompletionTimingThreadPoolExecutor)wrapper.delegate(); + return executor; + } + + @Test + public void testThatThreadPoolSizeFollowsConfig() { + ThreadPoolExecutor executor = createPool(3, 9); + assertEquals(3, executor.getMaximumPoolSize()); + assertEquals(9, executor.getQueue().remainingCapacity()); + } + @Test + public void testThatThreadPoolSizeAutoDetected() { + ThreadPoolExecutor executor = createPool(0, 0); + assertEquals(Runtime.getRuntime().availableProcessors()*4, executor.getMaximumPoolSize()); + assertEquals(0, executor.getQueue().remainingCapacity()); + } + @Test + public void testThatQueueSizeAutoDetected() { + ThreadPoolExecutor executor = createPool(3, -1); + assertEquals(3, executor.getMaximumPoolSize()); + assertEquals(executor.getMaximumPoolSize()*4, executor.getQueue().remainingCapacity()); + } + @Test + public void testThatThreadPoolSizeAndQueueSizeAutoDetected() { + ThreadPoolExecutor executor = createPool(0, -1); + assertEquals(Runtime.getRuntime().availableProcessors()*4, executor.getMaximumPoolSize()); + assertEquals(executor.getMaximumPoolSize()*4, executor.getQueue().remainingCapacity()); + } + private class FlipIt implements Runnable { public final Receiver<Boolean> didItRun = new Receiver<>(); |