aboutsummaryrefslogtreecommitdiffstats
path: root/container-core/src/main/java/com/yahoo/container/handler/threadpool/ExecutorServiceWrapper.java
diff options
context:
space:
mode:
authorBjørn Christian Seime <bjorncs@verizonmedia.com>2020-06-03 15:45:31 +0200
committerBjørn Christian Seime <bjorncs@verizonmedia.com>2020-06-03 15:50:22 +0200
commit18e9f06334c30ed4d73eeabaa0e6205e79c56c82 (patch)
tree8a829a3befd17eb8d1fe33c66215d84eff486033 /container-core/src/main/java/com/yahoo/container/handler/threadpool/ExecutorServiceWrapper.java
parent0ca803d1e89e79b0167e9fe545575356f6555ec5 (diff)
Split out thread pool implementation to separate classes
Diffstat (limited to 'container-core/src/main/java/com/yahoo/container/handler/threadpool/ExecutorServiceWrapper.java')
-rw-r--r--container-core/src/main/java/com/yahoo/container/handler/threadpool/ExecutorServiceWrapper.java94
1 files changed, 94 insertions, 0 deletions
diff --git a/container-core/src/main/java/com/yahoo/container/handler/threadpool/ExecutorServiceWrapper.java b/container-core/src/main/java/com/yahoo/container/handler/threadpool/ExecutorServiceWrapper.java
new file mode 100644
index 00000000000..f7b0a22120a
--- /dev/null
+++ b/container-core/src/main/java/com/yahoo/container/handler/threadpool/ExecutorServiceWrapper.java
@@ -0,0 +1,94 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.container.handler.threadpool;
+
+import com.google.common.util.concurrent.ForwardingExecutorService;
+import com.yahoo.container.protect.ProcessTerminator;
+import com.yahoo.jdisc.Metric;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * A service executor wrapper which emits metrics and
+ * shuts down the vm when no workers are available for too long to avoid containers lingering in a blocked state.
+ * Package private for testing
+ *
+ * @author Steinar Knutsen
+ * @author baldersheim
+ * @author bratseth
+ */
+class ExecutorServiceWrapper extends ForwardingExecutorService {
+
+ private final WorkerCompletionTimingThreadPoolExecutor wrapped;
+ private final Metric metric;
+ private final ProcessTerminator processTerminator;
+ private final long maxThreadExecutionTimeMillis;
+ private final Thread metricReporter;
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+
+ ExecutorServiceWrapper(
+ WorkerCompletionTimingThreadPoolExecutor wrapped,
+ Metric metric, ProcessTerminator processTerminator,
+ long maxThreadExecutionTimeMillis) {
+ this.wrapped = wrapped;
+ this.metric = metric;
+ this.processTerminator = processTerminator;
+ this.maxThreadExecutionTimeMillis = maxThreadExecutionTimeMillis;
+
+ metric.set(MetricNames.THREAD_POOL_SIZE, wrapped.getPoolSize(), null);
+ metric.set(MetricNames.ACTIVE_THREADS, wrapped.getActiveCount(), null);
+ metric.add(MetricNames.REJECTED_REQUEST, 0, null);
+ metricReporter = new Thread(this::reportMetrics);
+ metricReporter.setDaemon(true);
+ metricReporter.start();
+ }
+
+ private final void reportMetrics() {
+ try {
+ while (!closed.get()) {
+ metric.set(MetricNames.THREAD_POOL_SIZE, wrapped.getPoolSize(), null);
+ metric.set(MetricNames.ACTIVE_THREADS, wrapped.getActiveCount(), null);
+ Thread.sleep(100);
+ }
+ } catch (InterruptedException e) { }
+ }
+
+ @Override
+ public void shutdown() {
+ super.shutdown();
+ closed.set(true);
+ }
+
+ /**
+ * Tracks all instances of {@link RejectedExecutionException}.
+ * {@link ContainerThreadPool} returns an executor, so external uses will not
+ * have access to the methods declared by {@link ExecutorService}.
+ * ({@link Executor#execute(Runnable)} is declared by {@link Executor}.)
+ */
+ @Override
+ public void execute(Runnable command) {
+ try {
+ super.execute(command);
+ } catch (RejectedExecutionException e) {
+ metric.add(MetricNames.REJECTED_REQUEST, 1, null);
+ long timeSinceLastReturnedThreadMillis = System.currentTimeMillis() - wrapped.lastThreadAssignmentTimeMillis;
+ if (timeSinceLastReturnedThreadMillis > maxThreadExecutionTimeMillis)
+ processTerminator.logAndDie("No worker threads have been available for " +
+ timeSinceLastReturnedThreadMillis + " ms. Shutting down.", true);
+ throw e;
+ }
+ }
+
+ @Override
+ protected ExecutorService delegate() { return wrapped; }
+
+ private static final class MetricNames {
+ private static final String REJECTED_REQUEST = "serverRejectedRequests";
+ private static final String THREAD_POOL_SIZE = "serverThreadPoolSize";
+ private static final String ACTIVE_THREADS = "serverActiveThreads";
+ }
+
+}
+