diff options
Diffstat (limited to 'container-core/src/main/java/com/yahoo/container/handler/ThreadPoolProvider.java')
-rw-r--r-- | container-core/src/main/java/com/yahoo/container/handler/ThreadPoolProvider.java | 186 |
1 files changed, 6 insertions, 180 deletions
diff --git a/container-core/src/main/java/com/yahoo/container/handler/ThreadPoolProvider.java b/container-core/src/main/java/com/yahoo/container/handler/ThreadPoolProvider.java index 0e786cfbc8f..958e958456c 100644 --- a/container-core/src/main/java/com/yahoo/container/handler/ThreadPoolProvider.java +++ b/container-core/src/main/java/com/yahoo/container/handler/ThreadPoolProvider.java @@ -1,26 +1,13 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.container.handler; -import com.google.common.util.concurrent.ForwardingExecutorService; -import com.google.inject.Inject; import com.yahoo.component.AbstractComponent; -import com.yahoo.concurrent.ThreadFactoryFactory; import com.yahoo.container.di.componentgraph.Provider; +import com.yahoo.container.handler.threadpool.ContainerThreadPool; import com.yahoo.container.protect.ProcessTerminator; import com.yahoo.jdisc.Metric; -import java.util.Map; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; /** * A configurable thread pool provider. This provides the worker threads used for normal request processing. @@ -32,40 +19,14 @@ import java.util.concurrent.atomic.AtomicLong; */ public class ThreadPoolProvider extends AbstractComponent implements Provider<Executor> { - private final ExecutorServiceWrapper threadpool; + private final ContainerThreadPool threadpool; - private static BlockingQueue<Runnable> createQ(int queueSize, int maxThreads) { - return (queueSize == 0) - ? new SynchronousQueue<>(false) - : (queueSize < 0) - ? new ArrayBlockingQueue<>(maxThreads*4) - : new ArrayBlockingQueue<>(queueSize); - } - - private static int computeThreadPoolSize(int maxNumThreads) { - return (maxNumThreads <= 0) - ? Runtime.getRuntime().availableProcessors() * 4 - : maxNumThreads; - } - @Inject public ThreadPoolProvider(ThreadpoolConfig threadpoolConfig, Metric metric) { - this(threadpoolConfig, metric, new ProcessTerminator()); + this.threadpool = new ContainerThreadPool(threadpoolConfig, metric); } public ThreadPoolProvider(ThreadpoolConfig threadpoolConfig, Metric metric, ProcessTerminator processTerminator) { - int maxNumThreads = computeThreadPoolSize(threadpoolConfig.maxthreads()); - WorkerCompletionTimingThreadPoolExecutor executor = - new WorkerCompletionTimingThreadPoolExecutor(maxNumThreads, maxNumThreads, - 0L, TimeUnit.SECONDS, - createQ(threadpoolConfig.queueSize(), maxNumThreads), - ThreadFactoryFactory.getThreadFactory("threadpool"), - metric); - // Prestart needed, if not all threads will be created by the fist N tasks and hence they might also - // get the dreaded thread locals initialized even if they will never run. - // That counters what we we want to achieve with the Q that will prefer thread locality. - executor.prestartAllCoreThreads(); - threadpool = new ExecutorServiceWrapper(executor, metric, processTerminator, - threadpoolConfig.maxThreadExecutionTimeSeconds() * 1000L); + this.threadpool = new ContainerThreadPool(threadpoolConfig, metric, processTerminator); } /** @@ -75,7 +36,7 @@ public class ThreadPoolProvider extends AbstractComponent implements Provider<Ex * @return a possibly shared executor */ @Override - public Executor get() { return threadpool; } + public Executor get() { return threadpool.executor(); } /** * Shutdown the thread pool, give a grace period of 1 second before forcibly @@ -83,142 +44,7 @@ public class ThreadPoolProvider extends AbstractComponent implements Provider<Ex */ @Override public void deconstruct() { - boolean terminated; - - super.deconstruct(); - threadpool.shutdown(); - try { - terminated = threadpool.awaitTermination(1, TimeUnit.SECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return; - } - if (!terminated) { - threadpool.shutdownNow(); - } - } - - /** - * A service executor wrapper which emits metrics and - * shuts down the vm when no workers are available for too long to avoid containers lingering in a blocked state. - * Package private for testing - */ - final static class ExecutorServiceWrapper extends ForwardingExecutorService { - - private final WorkerCompletionTimingThreadPoolExecutor wrapped; - private final Metric metric; - private final ProcessTerminator processTerminator; - private final long maxThreadExecutionTimeMillis; - private final Thread metricReporter; - private final AtomicBoolean closed = new AtomicBoolean(false); - - private ExecutorServiceWrapper(WorkerCompletionTimingThreadPoolExecutor wrapped, - Metric metric, ProcessTerminator processTerminator, - long maxThreadExecutionTimeMillis) { - this.wrapped = wrapped; - this.metric = metric; - this.processTerminator = processTerminator; - this.maxThreadExecutionTimeMillis = maxThreadExecutionTimeMillis; - - metric.set(MetricNames.THREAD_POOL_SIZE, wrapped.getPoolSize(), null); - metric.set(MetricNames.ACTIVE_THREADS, wrapped.getActiveCount(), null); - metric.add(MetricNames.REJECTED_REQUEST, 0, null); - metricReporter = new Thread(this::reportMetrics); - metricReporter.setDaemon(true); - metricReporter.start(); - } - - private final void reportMetrics() { - try { - while (!closed.get()) { - metric.set(MetricNames.THREAD_POOL_SIZE, wrapped.getPoolSize(), null); - metric.set(MetricNames.ACTIVE_THREADS, wrapped.getActiveCount(), null); - Thread.sleep(100); - } - } catch (InterruptedException e) { } - } - - @Override - public void shutdown() { - super.shutdown(); - closed.set(true); - } - - /** - * Tracks all instances of RejectedExecutionException. - * ThreadPoolProvider returns an executor, so external uses will not - * have access to the methods declared by ExecutorService. - * (execute(Runnable) is declared by Executor.) - */ - @Override - public void execute(Runnable command) { - try { - super.execute(command); - } catch (RejectedExecutionException e) { - metric.add(MetricNames.REJECTED_REQUEST, 1, null); - long timeSinceLastReturnedThreadMillis = System.currentTimeMillis() - wrapped.lastThreadAssignmentTimeMillis; - if (timeSinceLastReturnedThreadMillis > maxThreadExecutionTimeMillis) - processTerminator.logAndDie("No worker threads have been available for " + - timeSinceLastReturnedThreadMillis + " ms. Shutting down.", true); - throw e; - } - } - - @Override - protected ExecutorService delegate() { return wrapped; } - - private static final class MetricNames { - private static final String REJECTED_REQUEST = "serverRejectedRequests"; - private static final String THREAD_POOL_SIZE = "serverThreadPoolSize"; - private static final String ACTIVE_THREADS = "serverActiveThreads"; - } - - } - - /** - * A thread pool executor which maintains the last time a worker completed - * package private for testing - **/ - final static class WorkerCompletionTimingThreadPoolExecutor extends ThreadPoolExecutor { - - private static final String UNHANDLED_EXCEPTIONS_METRIC = "jdisc.thread_pool.unhandled_exceptions"; - - volatile long lastThreadAssignmentTimeMillis = System.currentTimeMillis(); - private final AtomicLong startedCount = new AtomicLong(0); - private final AtomicLong completedCount = new AtomicLong(0); - private final Metric metric; - - public WorkerCompletionTimingThreadPoolExecutor(int corePoolSize, - int maximumPoolSize, - long keepAliveTime, - TimeUnit unit, - BlockingQueue<Runnable> workQueue, - ThreadFactory threadFactory, - Metric metric) { - super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); - this.metric = metric; - } - - @Override - protected void beforeExecute(Thread t, Runnable r) { - super.beforeExecute(t, r); - lastThreadAssignmentTimeMillis = System.currentTimeMillis(); - startedCount.incrementAndGet(); - } - - @Override - protected void afterExecute(Runnable r, Throwable t) { - super.afterExecute(r, t); - completedCount.incrementAndGet(); - if (t != null) { - metric.add(UNHANDLED_EXCEPTIONS_METRIC, 1L, metric.createContext(Map.of("exception", t.getClass().getSimpleName()))); - } - } - - @Override - public int getActiveCount() { - return (int)(startedCount.get() - completedCount.get()); - } + threadpool.deconstruct(); } } |