diff options
author | Bjørn Christian Seime <bjorncs@verizonmedia.com> | 2020-09-21 16:11:30 +0200 |
---|---|---|
committer | Bjørn Christian Seime <bjorncs@verizonmedia.com> | 2020-09-21 16:11:30 +0200 |
commit | 21b36d7313ff9f02c93f85964ba682e8bc5dd148 (patch) | |
tree | bf1c8e008d56588483567297d28269c2179d709a /container-core | |
parent | 8df7e4f1fbbba3fcc065948f829b50dcc0f09217 (diff) |
Make ContainerThreadPool an interface
Diffstat (limited to 'container-core')
-rw-r--r-- | container-core/src/main/java/com/yahoo/container/core/config/testutil/HandlersConfigurerTestWrapper.java | 19 | ||||
-rw-r--r-- | container-core/src/main/java/com/yahoo/container/handler/ThreadPoolProvider.java | 5 | ||||
-rw-r--r-- | container-core/src/main/java/com/yahoo/container/handler/threadpool/ContainerThreadPool.java | 84 | ||||
-rw-r--r-- | container-core/src/main/java/com/yahoo/container/handler/threadpool/DefaultContainerThreadpool.java | 94 | ||||
-rw-r--r-- | container-core/src/test/java/com/yahoo/container/handler/threadpool/DefaultContainerThreadPoolTest.java (renamed from container-core/src/test/java/com/yahoo/container/handler/threadpool/ContainerThreadPoolTest.java) | 8 |
5 files changed, 112 insertions, 98 deletions
diff --git a/container-core/src/main/java/com/yahoo/container/core/config/testutil/HandlersConfigurerTestWrapper.java b/container-core/src/main/java/com/yahoo/container/core/config/testutil/HandlersConfigurerTestWrapper.java index cb177691fa3..2ae33347408 100644 --- a/container-core/src/main/java/com/yahoo/container/core/config/testutil/HandlersConfigurerTestWrapper.java +++ b/container-core/src/main/java/com/yahoo/container/core/config/testutil/HandlersConfigurerTestWrapper.java @@ -13,8 +13,6 @@ import com.yahoo.container.core.config.HandlersConfigurerDi; import com.yahoo.container.di.CloudSubscriberFactory; import com.yahoo.container.di.ComponentDeconstructor; import com.yahoo.container.handler.threadpool.ContainerThreadPool; -import com.yahoo.container.handler.threadpool.ContainerThreadpoolConfig; -import com.yahoo.jdisc.Metric; import com.yahoo.jdisc.handler.RequestHandler; import com.yahoo.language.Linguistics; import com.yahoo.language.simple.SimpleLinguistics; @@ -22,9 +20,10 @@ import com.yahoo.language.simple.SimpleLinguistics; import java.io.File; import java.io.IOException; import java.util.LinkedHashSet; -import java.util.Map; import java.util.Random; import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; /** * Class for testing HandlersConfigurer. @@ -138,16 +137,14 @@ public class HandlersConfigurerTestWrapper { protected void configure() { // Needed by e.g. SearchHandler bind(Linguistics.class).to(SimpleLinguistics.class).in(Scopes.SINGLETON); - bind(ContainerThreadPool.class).toInstance( - new ContainerThreadPool( - new ContainerThreadpoolConfig(new ContainerThreadpoolConfig.Builder()), - new Metric() { - @Override public void set(String key, Number val, Context ctx) {} - @Override public void add(String key, Number val, Context ctx) {} - @Override public Context createContext(Map<String, ?> properties) { return null;} - })); + bind(ContainerThreadPool.class).to(SimpleContainerThreadpool.class); } }); } + private static class SimpleContainerThreadpool implements ContainerThreadPool { + private final Executor executor = Executors.newCachedThreadPool(); + @Override public Executor executor() { return executor; } + } + } diff --git a/container-core/src/main/java/com/yahoo/container/handler/ThreadPoolProvider.java b/container-core/src/main/java/com/yahoo/container/handler/ThreadPoolProvider.java index dc594903f21..ae313b1b04c 100644 --- a/container-core/src/main/java/com/yahoo/container/handler/ThreadPoolProvider.java +++ b/container-core/src/main/java/com/yahoo/container/handler/ThreadPoolProvider.java @@ -6,6 +6,7 @@ import com.yahoo.component.AbstractComponent; import com.yahoo.container.di.componentgraph.Provider; import com.yahoo.container.handler.threadpool.ContainerThreadPool; import com.yahoo.container.handler.threadpool.ContainerThreadpoolConfig; +import com.yahoo.container.handler.threadpool.DefaultContainerThreadpool; import com.yahoo.container.protect.ProcessTerminator; import com.yahoo.jdisc.Metric; @@ -25,11 +26,11 @@ public class ThreadPoolProvider extends AbstractComponent implements Provider<Ex @Inject public ThreadPoolProvider(ThreadpoolConfig config, Metric metric) { - this.threadpool = new ContainerThreadPool(translateConfig(config), metric); + this.threadpool = new DefaultContainerThreadpool(translateConfig(config), metric); } public ThreadPoolProvider(ThreadpoolConfig config, Metric metric, ProcessTerminator processTerminator) { - this.threadpool = new ContainerThreadPool(translateConfig(config), metric, processTerminator); + this.threadpool = new DefaultContainerThreadpool(translateConfig(config), metric, processTerminator); } /** diff --git a/container-core/src/main/java/com/yahoo/container/handler/threadpool/ContainerThreadPool.java b/container-core/src/main/java/com/yahoo/container/handler/threadpool/ContainerThreadPool.java index 294453aa8ba..35bd807b126 100644 --- a/container-core/src/main/java/com/yahoo/container/handler/threadpool/ContainerThreadPool.java +++ b/container-core/src/main/java/com/yahoo/container/handler/threadpool/ContainerThreadPool.java @@ -1,95 +1,17 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.container.handler.threadpool; -import com.google.inject.Inject; -import com.yahoo.component.AbstractComponent; -import com.yahoo.concurrent.ThreadFactoryFactory; -import com.yahoo.container.protect.ProcessTerminator; -import com.yahoo.jdisc.Metric; - -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.TimeUnit; /** * A configurable thread pool. This provides the worker threads used for normal request processing. * - * @author Steinar Knutsen - * @author baldersheim - * @author bratseth * @author bjorncs */ -public class ContainerThreadPool extends AbstractComponent implements AutoCloseable { - - private final ExecutorServiceWrapper threadpool; - - @Inject - public ContainerThreadPool(ContainerThreadpoolConfig config, Metric metric) { - this(config, metric, new ProcessTerminator()); - } - - public ContainerThreadPool(ContainerThreadpoolConfig config, Metric metric, ProcessTerminator processTerminator) { - ThreadPoolMetric threadPoolMetric = new ThreadPoolMetric(metric, config.name()); - int maxNumThreads = computeMaximumThreadPoolSize(config.maxThreads()); - int coreNumThreads = computeCoreThreadPoolSize(config.minThreads(), maxNumThreads); - WorkerCompletionTimingThreadPoolExecutor executor = - new WorkerCompletionTimingThreadPoolExecutor(coreNumThreads, maxNumThreads, - (int)config.keepAliveTime() * 1000, TimeUnit.MILLISECONDS, - createQ(config.queueSize(), maxNumThreads), - ThreadFactoryFactory.getThreadFactory(config.name()), - threadPoolMetric); - // Prestart needed, if not all threads will be created by the fist N tasks and hence they might also - // get the dreaded thread locals initialized even if they will never run. - // That counters what we we want to achieve with the Q that will prefer thread locality. - executor.prestartAllCoreThreads(); - threadpool = new ExecutorServiceWrapper(executor, threadPoolMetric, processTerminator, - config.maxThreadExecutionTimeSeconds() * 1000L); - } - - public Executor executor() { return threadpool; } - public int queuedTasks() { return threadpool.queuedTasks(); } - @Override public void close() { closeInternal(); } - @Override public void deconstruct() { closeInternal(); super.deconstruct(); } - - /** - * Shutdown the thread pool, give a grace period of 1 second before forcibly - * shutting down all worker threads. - */ - private void closeInternal() { - boolean terminated; - - threadpool.shutdown(); - try { - terminated = threadpool.awaitTermination(1, TimeUnit.SECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return; - } - if (!terminated) { - threadpool.shutdownNow(); - } - } - - private static BlockingQueue<Runnable> createQ(int queueSize, int maxThreads) { - return (queueSize == 0) - ? new SynchronousQueue<>(false) - : (queueSize < 0) - ? new ArrayBlockingQueue<>(maxThreads*4) - : new ArrayBlockingQueue<>(queueSize); - } +public interface ContainerThreadPool extends AutoCloseable { - private static int computeMaximumThreadPoolSize(int maxNumThreads) { - return (maxNumThreads <= 0) - ? Runtime.getRuntime().availableProcessors() * 4 - : maxNumThreads; - } + Executor executor(); - private static int computeCoreThreadPoolSize(int corePoolSize, int maxNumThreads) { - return Math.min( - corePoolSize <= 0 ? Runtime.getRuntime().availableProcessors() * 2 : corePoolSize, - maxNumThreads); - } + default void close() {} } diff --git a/container-core/src/main/java/com/yahoo/container/handler/threadpool/DefaultContainerThreadpool.java b/container-core/src/main/java/com/yahoo/container/handler/threadpool/DefaultContainerThreadpool.java new file mode 100644 index 00000000000..4d8c245a25a --- /dev/null +++ b/container-core/src/main/java/com/yahoo/container/handler/threadpool/DefaultContainerThreadpool.java @@ -0,0 +1,94 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.container.handler.threadpool; + +import com.google.inject.Inject; +import com.yahoo.component.AbstractComponent; +import com.yahoo.concurrent.ThreadFactoryFactory; +import com.yahoo.container.protect.ProcessTerminator; +import com.yahoo.jdisc.Metric; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; + +/** + * Default implementation of {@link DefaultContainerThreadpool}. + * + * @author Steinar Knutsen + * @author baldersheim + * @author bratseth + * @author bjorncs + */ +public class DefaultContainerThreadpool extends AbstractComponent implements AutoCloseable, ContainerThreadPool { + + private final ExecutorServiceWrapper threadpool; + + @Inject + public DefaultContainerThreadpool(ContainerThreadpoolConfig config, Metric metric) { + this(config, metric, new ProcessTerminator()); + } + + public DefaultContainerThreadpool(ContainerThreadpoolConfig config, Metric metric, ProcessTerminator processTerminator) { + ThreadPoolMetric threadPoolMetric = new ThreadPoolMetric(metric, config.name()); + int maxNumThreads = computeMaximumThreadPoolSize(config.maxThreads()); + int coreNumThreads = computeCoreThreadPoolSize(config.minThreads(), maxNumThreads); + WorkerCompletionTimingThreadPoolExecutor executor = + new WorkerCompletionTimingThreadPoolExecutor(coreNumThreads, maxNumThreads, + (int)config.keepAliveTime() * 1000, TimeUnit.MILLISECONDS, + createQ(config.queueSize(), maxNumThreads), + ThreadFactoryFactory.getThreadFactory(config.name()), + threadPoolMetric); + // Prestart needed, if not all threads will be created by the fist N tasks and hence they might also + // get the dreaded thread locals initialized even if they will never run. + // That counters what we we want to achieve with the Q that will prefer thread locality. + executor.prestartAllCoreThreads(); + threadpool = new ExecutorServiceWrapper(executor, threadPoolMetric, processTerminator, + config.maxThreadExecutionTimeSeconds() * 1000L); + } + + @Override public Executor executor() { return threadpool; } + @Override public void close() { closeInternal(); } + @Override public void deconstruct() { closeInternal(); super.deconstruct(); } + + /** + * Shutdown the thread pool, give a grace period of 1 second before forcibly + * shutting down all worker threads. + */ + private void closeInternal() { + boolean terminated; + + threadpool.shutdown(); + try { + terminated = threadpool.awaitTermination(1, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + if (!terminated) { + threadpool.shutdownNow(); + } + } + + private static BlockingQueue<Runnable> createQ(int queueSize, int maxThreads) { + return (queueSize == 0) + ? new SynchronousQueue<>(false) + : (queueSize < 0) + ? new ArrayBlockingQueue<>(maxThreads*4) + : new ArrayBlockingQueue<>(queueSize); + } + + private static int computeMaximumThreadPoolSize(int maxNumThreads) { + return (maxNumThreads <= 0) + ? Runtime.getRuntime().availableProcessors() * 4 + : maxNumThreads; + } + + private static int computeCoreThreadPoolSize(int corePoolSize, int maxNumThreads) { + return Math.min( + corePoolSize <= 0 ? Runtime.getRuntime().availableProcessors() * 2 : corePoolSize, + maxNumThreads); + } + +} diff --git a/container-core/src/test/java/com/yahoo/container/handler/threadpool/ContainerThreadPoolTest.java b/container-core/src/test/java/com/yahoo/container/handler/threadpool/DefaultContainerThreadPoolTest.java index 02e791099ed..8b1ed12c796 100644 --- a/container-core/src/test/java/com/yahoo/container/handler/threadpool/ContainerThreadPoolTest.java +++ b/container-core/src/test/java/com/yahoo/container/handler/threadpool/DefaultContainerThreadPoolTest.java @@ -20,11 +20,11 @@ import static org.junit.Assert.fail; * @author Steinar Knutsen * @author bjorncs */ -public class ContainerThreadPoolTest { +public class DefaultContainerThreadPoolTest { @Test public final void testThreadPool() throws InterruptedException { ContainerThreadpoolConfig config = new ContainerThreadpoolConfig(new ContainerThreadpoolConfig.Builder().maxThreads(1)); - ContainerThreadPool threadPool = new ContainerThreadPool(config, Mockito.mock(Metric.class)); + ContainerThreadPool threadPool = new DefaultContainerThreadpool(config, Mockito.mock(Metric.class)); Executor exec = threadPool.executor(); Tuple2<Receiver.MessageState, Boolean> reply; FlipIt command = new FlipIt(); @@ -56,7 +56,7 @@ public class ContainerThreadPoolTest { private ThreadPoolExecutor createPool(int maxThreads, int queueSize) { ContainerThreadpoolConfig config = new ContainerThreadpoolConfig(new ContainerThreadpoolConfig.Builder().maxThreads(maxThreads).queueSize(queueSize)); - ContainerThreadPool threadPool = new ContainerThreadPool(config, Mockito.mock(Metric.class)); + ContainerThreadPool threadPool = new DefaultContainerThreadpool(config, Mockito.mock(Metric.class)); ExecutorServiceWrapper wrapper = (ExecutorServiceWrapper) threadPool.executor(); WorkerCompletionTimingThreadPoolExecutor executor = (WorkerCompletionTimingThreadPoolExecutor)wrapper.delegate(); return executor; @@ -104,7 +104,7 @@ public class ContainerThreadPoolTest { .maxThreads(2) .maxThreadExecutionTimeSeconds(1)); MockProcessTerminator terminator = new MockProcessTerminator(); - ContainerThreadPool threadPool = new ContainerThreadPool(config, Mockito.mock(Metric.class), terminator); + ContainerThreadPool threadPool = new DefaultContainerThreadpool(config, Mockito.mock(Metric.class), terminator); // No dying when threads hang shorter than max thread execution time threadPool.executor().execute(new Hang(500)); |