summaryrefslogtreecommitdiffstats
path: root/container-core/src/main/java/com/yahoo/container/handler/ThreadPoolProvider.java
diff options
context:
space:
mode:
Diffstat (limited to 'container-core/src/main/java/com/yahoo/container/handler/ThreadPoolProvider.java')
-rw-r--r--container-core/src/main/java/com/yahoo/container/handler/ThreadPoolProvider.java186
1 files changed, 6 insertions, 180 deletions
diff --git a/container-core/src/main/java/com/yahoo/container/handler/ThreadPoolProvider.java b/container-core/src/main/java/com/yahoo/container/handler/ThreadPoolProvider.java
index 0e786cfbc8f..958e958456c 100644
--- a/container-core/src/main/java/com/yahoo/container/handler/ThreadPoolProvider.java
+++ b/container-core/src/main/java/com/yahoo/container/handler/ThreadPoolProvider.java
@@ -1,26 +1,13 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.container.handler;
-import com.google.common.util.concurrent.ForwardingExecutorService;
-import com.google.inject.Inject;
import com.yahoo.component.AbstractComponent;
-import com.yahoo.concurrent.ThreadFactoryFactory;
import com.yahoo.container.di.componentgraph.Provider;
+import com.yahoo.container.handler.threadpool.ContainerThreadPool;
import com.yahoo.container.protect.ProcessTerminator;
import com.yahoo.jdisc.Metric;
-import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
/**
* A configurable thread pool provider. This provides the worker threads used for normal request processing.
@@ -32,40 +19,14 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class ThreadPoolProvider extends AbstractComponent implements Provider<Executor> {
- private final ExecutorServiceWrapper threadpool;
+ private final ContainerThreadPool threadpool;
- private static BlockingQueue<Runnable> createQ(int queueSize, int maxThreads) {
- return (queueSize == 0)
- ? new SynchronousQueue<>(false)
- : (queueSize < 0)
- ? new ArrayBlockingQueue<>(maxThreads*4)
- : new ArrayBlockingQueue<>(queueSize);
- }
-
- private static int computeThreadPoolSize(int maxNumThreads) {
- return (maxNumThreads <= 0)
- ? Runtime.getRuntime().availableProcessors() * 4
- : maxNumThreads;
- }
- @Inject
public ThreadPoolProvider(ThreadpoolConfig threadpoolConfig, Metric metric) {
- this(threadpoolConfig, metric, new ProcessTerminator());
+ this.threadpool = new ContainerThreadPool(threadpoolConfig, metric);
}
public ThreadPoolProvider(ThreadpoolConfig threadpoolConfig, Metric metric, ProcessTerminator processTerminator) {
- int maxNumThreads = computeThreadPoolSize(threadpoolConfig.maxthreads());
- WorkerCompletionTimingThreadPoolExecutor executor =
- new WorkerCompletionTimingThreadPoolExecutor(maxNumThreads, maxNumThreads,
- 0L, TimeUnit.SECONDS,
- createQ(threadpoolConfig.queueSize(), maxNumThreads),
- ThreadFactoryFactory.getThreadFactory("threadpool"),
- metric);
- // Prestart needed, if not all threads will be created by the fist N tasks and hence they might also
- // get the dreaded thread locals initialized even if they will never run.
- // That counters what we we want to achieve with the Q that will prefer thread locality.
- executor.prestartAllCoreThreads();
- threadpool = new ExecutorServiceWrapper(executor, metric, processTerminator,
- threadpoolConfig.maxThreadExecutionTimeSeconds() * 1000L);
+ this.threadpool = new ContainerThreadPool(threadpoolConfig, metric, processTerminator);
}
/**
@@ -75,7 +36,7 @@ public class ThreadPoolProvider extends AbstractComponent implements Provider<Ex
* @return a possibly shared executor
*/
@Override
- public Executor get() { return threadpool; }
+ public Executor get() { return threadpool.executor(); }
/**
* Shutdown the thread pool, give a grace period of 1 second before forcibly
@@ -83,142 +44,7 @@ public class ThreadPoolProvider extends AbstractComponent implements Provider<Ex
*/
@Override
public void deconstruct() {
- boolean terminated;
-
- super.deconstruct();
- threadpool.shutdown();
- try {
- terminated = threadpool.awaitTermination(1, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- return;
- }
- if (!terminated) {
- threadpool.shutdownNow();
- }
- }
-
- /**
- * A service executor wrapper which emits metrics and
- * shuts down the vm when no workers are available for too long to avoid containers lingering in a blocked state.
- * Package private for testing
- */
- final static class ExecutorServiceWrapper extends ForwardingExecutorService {
-
- private final WorkerCompletionTimingThreadPoolExecutor wrapped;
- private final Metric metric;
- private final ProcessTerminator processTerminator;
- private final long maxThreadExecutionTimeMillis;
- private final Thread metricReporter;
- private final AtomicBoolean closed = new AtomicBoolean(false);
-
- private ExecutorServiceWrapper(WorkerCompletionTimingThreadPoolExecutor wrapped,
- Metric metric, ProcessTerminator processTerminator,
- long maxThreadExecutionTimeMillis) {
- this.wrapped = wrapped;
- this.metric = metric;
- this.processTerminator = processTerminator;
- this.maxThreadExecutionTimeMillis = maxThreadExecutionTimeMillis;
-
- metric.set(MetricNames.THREAD_POOL_SIZE, wrapped.getPoolSize(), null);
- metric.set(MetricNames.ACTIVE_THREADS, wrapped.getActiveCount(), null);
- metric.add(MetricNames.REJECTED_REQUEST, 0, null);
- metricReporter = new Thread(this::reportMetrics);
- metricReporter.setDaemon(true);
- metricReporter.start();
- }
-
- private final void reportMetrics() {
- try {
- while (!closed.get()) {
- metric.set(MetricNames.THREAD_POOL_SIZE, wrapped.getPoolSize(), null);
- metric.set(MetricNames.ACTIVE_THREADS, wrapped.getActiveCount(), null);
- Thread.sleep(100);
- }
- } catch (InterruptedException e) { }
- }
-
- @Override
- public void shutdown() {
- super.shutdown();
- closed.set(true);
- }
-
- /**
- * Tracks all instances of RejectedExecutionException.
- * ThreadPoolProvider returns an executor, so external uses will not
- * have access to the methods declared by ExecutorService.
- * (execute(Runnable) is declared by Executor.)
- */
- @Override
- public void execute(Runnable command) {
- try {
- super.execute(command);
- } catch (RejectedExecutionException e) {
- metric.add(MetricNames.REJECTED_REQUEST, 1, null);
- long timeSinceLastReturnedThreadMillis = System.currentTimeMillis() - wrapped.lastThreadAssignmentTimeMillis;
- if (timeSinceLastReturnedThreadMillis > maxThreadExecutionTimeMillis)
- processTerminator.logAndDie("No worker threads have been available for " +
- timeSinceLastReturnedThreadMillis + " ms. Shutting down.", true);
- throw e;
- }
- }
-
- @Override
- protected ExecutorService delegate() { return wrapped; }
-
- private static final class MetricNames {
- private static final String REJECTED_REQUEST = "serverRejectedRequests";
- private static final String THREAD_POOL_SIZE = "serverThreadPoolSize";
- private static final String ACTIVE_THREADS = "serverActiveThreads";
- }
-
- }
-
- /**
- * A thread pool executor which maintains the last time a worker completed
- * package private for testing
- **/
- final static class WorkerCompletionTimingThreadPoolExecutor extends ThreadPoolExecutor {
-
- private static final String UNHANDLED_EXCEPTIONS_METRIC = "jdisc.thread_pool.unhandled_exceptions";
-
- volatile long lastThreadAssignmentTimeMillis = System.currentTimeMillis();
- private final AtomicLong startedCount = new AtomicLong(0);
- private final AtomicLong completedCount = new AtomicLong(0);
- private final Metric metric;
-
- public WorkerCompletionTimingThreadPoolExecutor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue<Runnable> workQueue,
- ThreadFactory threadFactory,
- Metric metric) {
- super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
- this.metric = metric;
- }
-
- @Override
- protected void beforeExecute(Thread t, Runnable r) {
- super.beforeExecute(t, r);
- lastThreadAssignmentTimeMillis = System.currentTimeMillis();
- startedCount.incrementAndGet();
- }
-
- @Override
- protected void afterExecute(Runnable r, Throwable t) {
- super.afterExecute(r, t);
- completedCount.incrementAndGet();
- if (t != null) {
- metric.add(UNHANDLED_EXCEPTIONS_METRIC, 1L, metric.createContext(Map.of("exception", t.getClass().getSimpleName())));
- }
- }
-
- @Override
- public int getActiveCount() {
- return (int)(startedCount.get() - completedCount.get());
- }
+ threadpool.deconstruct();
}
}