diff options
author | Bjørn Christian Seime <bjorncs@verizonmedia.com> | 2021-09-30 14:57:23 +0200 |
---|---|---|
committer | Bjørn Christian Seime <bjorncs@verizonmedia.com> | 2021-09-30 14:57:23 +0200 |
commit | a30849486a1a6e4639193297b6972457ff242a89 (patch) | |
tree | c5bf505d15bb9bf07306f5f9a06aacc067b6e6df /container-core | |
parent | 9c113ad7ea0f5181f0d82933f6d08dc8f7637ef6 (diff) |
Scale jdisc threadpools with cpus available in JVM
Change config model to only determine the scaling factors for all threadpool configuration.
Diffstat (limited to 'container-core')
5 files changed, 68 insertions, 37 deletions
diff --git a/container-core/src/main/java/com/yahoo/container/handler/ThreadPoolProvider.java b/container-core/src/main/java/com/yahoo/container/handler/ThreadPoolProvider.java index a07898cb1d1..081cefbd7eb 100644 --- a/container-core/src/main/java/com/yahoo/container/handler/ThreadPoolProvider.java +++ b/container-core/src/main/java/com/yahoo/container/handler/ThreadPoolProvider.java @@ -38,10 +38,14 @@ public class ThreadPoolProvider extends AbstractComponent implements Provider<Ex * as {@link ThreadpoolConfig} is currently public api. */ private static ContainerThreadpoolConfig translateConfig(ThreadpoolConfig config) { + int cpus = Runtime.getRuntime().availableProcessors(); + int max = config.maxthreads() >= 0 ? config.maxthreads() : Math.max(8, Math.abs(config.maxthreads()) * cpus); + int min = config.corePoolSize() >= 0 ? config.corePoolSize() : Math.max(8, Math.abs(config.corePoolSize()) * cpus); + return new ContainerThreadpoolConfig( new ContainerThreadpoolConfig.Builder() - .maxThreads(config.maxthreads()) - .minThreads(config.corePoolSize()) + .maxThreads(max) + .minThreads(min) .name(config.name()) .queueSize(config.queueSize()) .keepAliveTime(config.keepAliveTime()) diff --git a/container-core/src/main/java/com/yahoo/container/handler/threadpool/DefaultContainerThreadpool.java b/container-core/src/main/java/com/yahoo/container/handler/threadpool/DefaultContainerThreadpool.java index 6bed4a6f442..1fbbdf6f13f 100644 --- a/container-core/src/main/java/com/yahoo/container/handler/threadpool/DefaultContainerThreadpool.java +++ b/container-core/src/main/java/com/yahoo/container/handler/threadpool/DefaultContainerThreadpool.java @@ -12,9 +12,10 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; /** - * Default implementation of {@link DefaultContainerThreadpool}. + * Default implementation of {@link ContainerThreadPool}. * * @author Steinar Knutsen * @author baldersheim @@ -23,6 +24,8 @@ import java.util.concurrent.TimeUnit; */ public class DefaultContainerThreadpool extends AbstractComponent implements AutoCloseable, ContainerThreadPool { + private static final Logger log = Logger.getLogger(DefaultContainerThreadpool.class.getName()); + private final ExecutorServiceWrapper threadpool; @Inject @@ -31,14 +34,23 @@ public class DefaultContainerThreadpool extends AbstractComponent implements Aut } public DefaultContainerThreadpool(ContainerThreadpoolConfig config, Metric metric, ProcessTerminator processTerminator) { - ThreadPoolMetric threadPoolMetric = new ThreadPoolMetric(metric, config.name()); - int maxNumThreads = computeMaximumThreadPoolSize(config.maxThreads()); - int coreNumThreads = computeCoreThreadPoolSize(config.minThreads(), maxNumThreads); + this(config, metric, processTerminator, Runtime.getRuntime().availableProcessors()); + } + + DefaultContainerThreadpool(ContainerThreadpoolConfig config, Metric metric, ProcessTerminator processTerminator, + int cpus) { + String name = config.name(); + int maxThreads = maxThreads(config, cpus); + int minThreads = minThreads(config, maxThreads, cpus); + int queueSize = queueSize(config, maxThreads); + log.info(String.format("Threadpool '%s': min=%d, max=%d, queue=%d", name, minThreads, maxThreads, queueSize)); + + ThreadPoolMetric threadPoolMetric = new ThreadPoolMetric(metric, name); WorkerCompletionTimingThreadPoolExecutor executor = - new WorkerCompletionTimingThreadPoolExecutor(coreNumThreads, maxNumThreads, + new WorkerCompletionTimingThreadPoolExecutor(minThreads, maxThreads, (int)config.keepAliveTime() * 1000, TimeUnit.MILLISECONDS, - createQ(config.queueSize(), maxNumThreads), - ThreadFactoryFactory.getThreadFactory(config.name()), + createQueue(queueSize), + ThreadFactoryFactory.getThreadFactory(name), threadPoolMetric); // Prestart needed, if not all threads will be created by the fist N tasks and hence they might also // get the dreaded thread locals initialized even if they will never run. @@ -46,7 +58,7 @@ public class DefaultContainerThreadpool extends AbstractComponent implements Aut executor.prestartAllCoreThreads(); threadpool = new ExecutorServiceWrapper( executor, threadPoolMetric, processTerminator, config.maxThreadExecutionTimeSeconds() * 1000L, - config.name(), config.queueSize()); + name, queueSize); } @Override public Executor executor() { return threadpool; } @@ -74,24 +86,26 @@ public class DefaultContainerThreadpool extends AbstractComponent implements Aut } } - private static BlockingQueue<Runnable> createQ(int queueSize, int maxThreads) { - return (queueSize == 0) - ? new SynchronousQueue<>(false) - : (queueSize < 0) - ? new ArrayBlockingQueue<>(maxThreads*4) - : new ArrayBlockingQueue<>(queueSize); + private static BlockingQueue<Runnable> createQueue(int size) { + return size == 0 ? new SynchronousQueue<>(false) : new ArrayBlockingQueue<>(size); + } + + private static int maxThreads(ContainerThreadpoolConfig config, int cpus) { + if (config.maxThreads() > 0) return config.maxThreads(); + else if (config.maxThreads() == 0) return 4 * cpus; + else return Math.abs(config.maxThreads()) * cpus; } - private static int computeMaximumThreadPoolSize(int maxNumThreads) { - return (maxNumThreads <= 0) - ? Runtime.getRuntime().availableProcessors() * 4 - : maxNumThreads; + private static int minThreads(ContainerThreadpoolConfig config, int max, int cpus) { + int threads; + if (config.minThreads() > 0) threads = config.minThreads(); + else if (config.minThreads() == 0) threads = 4 * cpus; + else threads = Math.abs(config.minThreads()) * cpus; + return Math.min(threads, max); } - private static int computeCoreThreadPoolSize(int corePoolSize, int maxNumThreads) { - return Math.min( - corePoolSize <= 0 ? Runtime.getRuntime().availableProcessors() * 2 : corePoolSize, - maxNumThreads); + private int queueSize(ContainerThreadpoolConfig config, int maxThreads) { + return config.queueSize() >= 0 ? config.queueSize() : Math.max(650, Math.abs(config.queueSize()) * maxThreads); } } diff --git a/container-core/src/main/resources/configdefinitions/container.handler.threadpool.container-threadpool.def b/container-core/src/main/resources/configdefinitions/container.handler.threadpool.container-threadpool.def index 9248bf2e2bf..4ba14c2da89 100644 --- a/container-core/src/main/resources/configdefinitions/container.handler.threadpool.container-threadpool.def +++ b/container-core/src/main/resources/configdefinitions/container.handler.threadpool.container-threadpool.def @@ -4,16 +4,19 @@ namespace=container.handler.threadpool ## Maximum number of thread in the thread pool ## 0 is translated to vcpu*4 +## Negative value is interpreted as scale factor ( vcpu*abs(maxThreads) ) maxThreads int default=0 ## Minimum number of thread in the thread pool ## 0 is translated to vcpu*2 +## Negative value is interpreted as scale factor ( vcpu*abs(minThreads) ) minThreads int default=0 ## The number of seconds that excess idle threads will wait for new tasks before terminating keepAliveTime double default=5.0 ## Max queue size +## Negative value is interpreted as scale factor ( effectiveMaxThreads*abs(queueSize) ) queueSize int default=0 ## The max time the container tolerates having no threads available before it shuts down to diff --git a/container-core/src/main/resources/configdefinitions/container.handler.threadpool.def b/container-core/src/main/resources/configdefinitions/container.handler.threadpool.def index d966738ea9f..e73ee2254fc 100644 --- a/container-core/src/main/resources/configdefinitions/container.handler.threadpool.def +++ b/container-core/src/main/resources/configdefinitions/container.handler.threadpool.def @@ -3,10 +3,13 @@ namespace=container.handler ## Maximum number of thread in the thread pool -## Setting it to 0 or negative number will cause it to be set to #cores * 4 +## 0 is translated to vcpu*4 +## Negative value is interpreted as scale factor ( vcpu*abs(maxThreads) ) maxthreads int default=500 -# The number of threads to keep in the pool, even if they are idle +## The number of threads to keep in the pool, even if they are idle +## 0 is translated to vcpu*4 +## Negative value is interpreted as scale factor ( vcpu*abs(corePoolSize) ) corePoolSize int default=500 # The number of seconds that excess idle threads will wait for new tasks before terminating diff --git a/container-core/src/test/java/com/yahoo/container/handler/threadpool/DefaultContainerThreadPoolTest.java b/container-core/src/test/java/com/yahoo/container/handler/threadpool/DefaultContainerThreadPoolTest.java index 8b1ed12c796..1d9c4b367bd 100644 --- a/container-core/src/test/java/com/yahoo/container/handler/threadpool/DefaultContainerThreadPoolTest.java +++ b/container-core/src/test/java/com/yahoo/container/handler/threadpool/DefaultContainerThreadPoolTest.java @@ -21,6 +21,9 @@ import static org.junit.Assert.fail; * @author bjorncs */ public class DefaultContainerThreadPoolTest { + + private static final int CPUS = 16; + @Test public final void testThreadPool() throws InterruptedException { ContainerThreadpoolConfig config = new ContainerThreadpoolConfig(new ContainerThreadpoolConfig.Builder().maxThreads(1)); @@ -55,8 +58,12 @@ public class DefaultContainerThreadPoolTest { } private ThreadPoolExecutor createPool(int maxThreads, int queueSize) { - ContainerThreadpoolConfig config = new ContainerThreadpoolConfig(new ContainerThreadpoolConfig.Builder().maxThreads(maxThreads).queueSize(queueSize)); - ContainerThreadPool threadPool = new DefaultContainerThreadpool(config, Mockito.mock(Metric.class)); + ContainerThreadpoolConfig config = new ContainerThreadpoolConfig(new ContainerThreadpoolConfig.Builder() + .maxThreads(maxThreads) + .minThreads(maxThreads) + .queueSize(queueSize)); + ContainerThreadPool threadPool = new DefaultContainerThreadpool( + config, Mockito.mock(Metric.class), new MockProcessTerminator(), CPUS); ExecutorServiceWrapper wrapper = (ExecutorServiceWrapper) threadPool.executor(); WorkerCompletionTimingThreadPoolExecutor executor = (WorkerCompletionTimingThreadPoolExecutor)wrapper.delegate(); return executor; @@ -64,27 +71,27 @@ public class DefaultContainerThreadPoolTest { @Test public void testThatThreadPoolSizeFollowsConfig() { - ThreadPoolExecutor executor = createPool(3, 9); + ThreadPoolExecutor executor = createPool(3, 1200); assertEquals(3, executor.getMaximumPoolSize()); - assertEquals(9, executor.getQueue().remainingCapacity()); + assertEquals(1200, executor.getQueue().remainingCapacity()); } @Test public void testThatThreadPoolSizeAutoDetected() { ThreadPoolExecutor executor = createPool(0, 0); - assertEquals(Runtime.getRuntime().availableProcessors()*4, executor.getMaximumPoolSize()); + assertEquals(CPUS*4, executor.getMaximumPoolSize()); assertEquals(0, executor.getQueue().remainingCapacity()); } @Test public void testThatQueueSizeAutoDetected() { - ThreadPoolExecutor executor = createPool(3, -1); - assertEquals(3, executor.getMaximumPoolSize()); - assertEquals(executor.getMaximumPoolSize()*4, executor.getQueue().remainingCapacity()); + ThreadPoolExecutor executor = createPool(24, -50); + assertEquals(24, executor.getMaximumPoolSize()); + assertEquals(24*50, executor.getQueue().remainingCapacity()); } @Test public void testThatThreadPoolSizeAndQueueSizeAutoDetected() { - ThreadPoolExecutor executor = createPool(0, -1); - assertEquals(Runtime.getRuntime().availableProcessors()*4, executor.getMaximumPoolSize()); - assertEquals(executor.getMaximumPoolSize()*4, executor.getQueue().remainingCapacity()); + ThreadPoolExecutor executor = createPool(0, -100); + assertEquals(CPUS*4, executor.getMaximumPoolSize()); + assertEquals(CPUS*4*100, executor.getQueue().remainingCapacity()); } private class FlipIt implements Runnable { |