summaryrefslogtreecommitdiffstats
path: root/container-core
diff options
context:
space:
mode:
authorBjørn Christian Seime <bjorncs@verizonmedia.com>2020-06-03 15:45:31 +0200
committerBjørn Christian Seime <bjorncs@verizonmedia.com>2020-06-03 15:50:22 +0200
commit18e9f06334c30ed4d73eeabaa0e6205e79c56c82 (patch)
tree8a829a3befd17eb8d1fe33c66215d84eff486033 /container-core
parent0ca803d1e89e79b0167e9fe545575356f6555ec5 (diff)
Split out thread pool implementation to separate classes
Diffstat (limited to 'container-core')
-rw-r--r--container-core/src/main/java/com/yahoo/container/handler/ThreadPoolProvider.java186
-rw-r--r--container-core/src/main/java/com/yahoo/container/handler/threadpool/ContainerThreadPool.java87
-rw-r--r--container-core/src/main/java/com/yahoo/container/handler/threadpool/ExecutorServiceWrapper.java94
-rw-r--r--container-core/src/main/java/com/yahoo/container/handler/threadpool/WorkerCompletionTimingThreadPoolExecutor.java63
-rw-r--r--container-core/src/main/java/com/yahoo/container/handler/threadpool/package-info.java8
-rw-r--r--container-core/src/test/java/com/yahoo/container/handler/threadpool/ContainerThreadPoolTest.java (renamed from container-core/src/test/java/com/yahoo/container/handler/ThreadPoolProviderTestCase.java)76
6 files changed, 294 insertions, 220 deletions
diff --git a/container-core/src/main/java/com/yahoo/container/handler/ThreadPoolProvider.java b/container-core/src/main/java/com/yahoo/container/handler/ThreadPoolProvider.java
index 0e786cfbc8f..958e958456c 100644
--- a/container-core/src/main/java/com/yahoo/container/handler/ThreadPoolProvider.java
+++ b/container-core/src/main/java/com/yahoo/container/handler/ThreadPoolProvider.java
@@ -1,26 +1,13 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.container.handler;
-import com.google.common.util.concurrent.ForwardingExecutorService;
-import com.google.inject.Inject;
import com.yahoo.component.AbstractComponent;
-import com.yahoo.concurrent.ThreadFactoryFactory;
import com.yahoo.container.di.componentgraph.Provider;
+import com.yahoo.container.handler.threadpool.ContainerThreadPool;
import com.yahoo.container.protect.ProcessTerminator;
import com.yahoo.jdisc.Metric;
-import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
/**
* A configurable thread pool provider. This provides the worker threads used for normal request processing.
@@ -32,40 +19,14 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class ThreadPoolProvider extends AbstractComponent implements Provider<Executor> {
- private final ExecutorServiceWrapper threadpool;
+ private final ContainerThreadPool threadpool;
- private static BlockingQueue<Runnable> createQ(int queueSize, int maxThreads) {
- return (queueSize == 0)
- ? new SynchronousQueue<>(false)
- : (queueSize < 0)
- ? new ArrayBlockingQueue<>(maxThreads*4)
- : new ArrayBlockingQueue<>(queueSize);
- }
-
- private static int computeThreadPoolSize(int maxNumThreads) {
- return (maxNumThreads <= 0)
- ? Runtime.getRuntime().availableProcessors() * 4
- : maxNumThreads;
- }
- @Inject
public ThreadPoolProvider(ThreadpoolConfig threadpoolConfig, Metric metric) {
- this(threadpoolConfig, metric, new ProcessTerminator());
+ this.threadpool = new ContainerThreadPool(threadpoolConfig, metric);
}
public ThreadPoolProvider(ThreadpoolConfig threadpoolConfig, Metric metric, ProcessTerminator processTerminator) {
- int maxNumThreads = computeThreadPoolSize(threadpoolConfig.maxthreads());
- WorkerCompletionTimingThreadPoolExecutor executor =
- new WorkerCompletionTimingThreadPoolExecutor(maxNumThreads, maxNumThreads,
- 0L, TimeUnit.SECONDS,
- createQ(threadpoolConfig.queueSize(), maxNumThreads),
- ThreadFactoryFactory.getThreadFactory("threadpool"),
- metric);
- // Prestart needed, if not all threads will be created by the fist N tasks and hence they might also
- // get the dreaded thread locals initialized even if they will never run.
- // That counters what we we want to achieve with the Q that will prefer thread locality.
- executor.prestartAllCoreThreads();
- threadpool = new ExecutorServiceWrapper(executor, metric, processTerminator,
- threadpoolConfig.maxThreadExecutionTimeSeconds() * 1000L);
+ this.threadpool = new ContainerThreadPool(threadpoolConfig, metric, processTerminator);
}
/**
@@ -75,7 +36,7 @@ public class ThreadPoolProvider extends AbstractComponent implements Provider<Ex
* @return a possibly shared executor
*/
@Override
- public Executor get() { return threadpool; }
+ public Executor get() { return threadpool.executor(); }
/**
* Shutdown the thread pool, give a grace period of 1 second before forcibly
@@ -83,142 +44,7 @@ public class ThreadPoolProvider extends AbstractComponent implements Provider<Ex
*/
@Override
public void deconstruct() {
- boolean terminated;
-
- super.deconstruct();
- threadpool.shutdown();
- try {
- terminated = threadpool.awaitTermination(1, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- return;
- }
- if (!terminated) {
- threadpool.shutdownNow();
- }
- }
-
- /**
- * A service executor wrapper which emits metrics and
- * shuts down the vm when no workers are available for too long to avoid containers lingering in a blocked state.
- * Package private for testing
- */
- final static class ExecutorServiceWrapper extends ForwardingExecutorService {
-
- private final WorkerCompletionTimingThreadPoolExecutor wrapped;
- private final Metric metric;
- private final ProcessTerminator processTerminator;
- private final long maxThreadExecutionTimeMillis;
- private final Thread metricReporter;
- private final AtomicBoolean closed = new AtomicBoolean(false);
-
- private ExecutorServiceWrapper(WorkerCompletionTimingThreadPoolExecutor wrapped,
- Metric metric, ProcessTerminator processTerminator,
- long maxThreadExecutionTimeMillis) {
- this.wrapped = wrapped;
- this.metric = metric;
- this.processTerminator = processTerminator;
- this.maxThreadExecutionTimeMillis = maxThreadExecutionTimeMillis;
-
- metric.set(MetricNames.THREAD_POOL_SIZE, wrapped.getPoolSize(), null);
- metric.set(MetricNames.ACTIVE_THREADS, wrapped.getActiveCount(), null);
- metric.add(MetricNames.REJECTED_REQUEST, 0, null);
- metricReporter = new Thread(this::reportMetrics);
- metricReporter.setDaemon(true);
- metricReporter.start();
- }
-
- private final void reportMetrics() {
- try {
- while (!closed.get()) {
- metric.set(MetricNames.THREAD_POOL_SIZE, wrapped.getPoolSize(), null);
- metric.set(MetricNames.ACTIVE_THREADS, wrapped.getActiveCount(), null);
- Thread.sleep(100);
- }
- } catch (InterruptedException e) { }
- }
-
- @Override
- public void shutdown() {
- super.shutdown();
- closed.set(true);
- }
-
- /**
- * Tracks all instances of RejectedExecutionException.
- * ThreadPoolProvider returns an executor, so external uses will not
- * have access to the methods declared by ExecutorService.
- * (execute(Runnable) is declared by Executor.)
- */
- @Override
- public void execute(Runnable command) {
- try {
- super.execute(command);
- } catch (RejectedExecutionException e) {
- metric.add(MetricNames.REJECTED_REQUEST, 1, null);
- long timeSinceLastReturnedThreadMillis = System.currentTimeMillis() - wrapped.lastThreadAssignmentTimeMillis;
- if (timeSinceLastReturnedThreadMillis > maxThreadExecutionTimeMillis)
- processTerminator.logAndDie("No worker threads have been available for " +
- timeSinceLastReturnedThreadMillis + " ms. Shutting down.", true);
- throw e;
- }
- }
-
- @Override
- protected ExecutorService delegate() { return wrapped; }
-
- private static final class MetricNames {
- private static final String REJECTED_REQUEST = "serverRejectedRequests";
- private static final String THREAD_POOL_SIZE = "serverThreadPoolSize";
- private static final String ACTIVE_THREADS = "serverActiveThreads";
- }
-
- }
-
- /**
- * A thread pool executor which maintains the last time a worker completed
- * package private for testing
- **/
- final static class WorkerCompletionTimingThreadPoolExecutor extends ThreadPoolExecutor {
-
- private static final String UNHANDLED_EXCEPTIONS_METRIC = "jdisc.thread_pool.unhandled_exceptions";
-
- volatile long lastThreadAssignmentTimeMillis = System.currentTimeMillis();
- private final AtomicLong startedCount = new AtomicLong(0);
- private final AtomicLong completedCount = new AtomicLong(0);
- private final Metric metric;
-
- public WorkerCompletionTimingThreadPoolExecutor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue<Runnable> workQueue,
- ThreadFactory threadFactory,
- Metric metric) {
- super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
- this.metric = metric;
- }
-
- @Override
- protected void beforeExecute(Thread t, Runnable r) {
- super.beforeExecute(t, r);
- lastThreadAssignmentTimeMillis = System.currentTimeMillis();
- startedCount.incrementAndGet();
- }
-
- @Override
- protected void afterExecute(Runnable r, Throwable t) {
- super.afterExecute(r, t);
- completedCount.incrementAndGet();
- if (t != null) {
- metric.add(UNHANDLED_EXCEPTIONS_METRIC, 1L, metric.createContext(Map.of("exception", t.getClass().getSimpleName())));
- }
- }
-
- @Override
- public int getActiveCount() {
- return (int)(startedCount.get() - completedCount.get());
- }
+ threadpool.deconstruct();
}
}
diff --git a/container-core/src/main/java/com/yahoo/container/handler/threadpool/ContainerThreadPool.java b/container-core/src/main/java/com/yahoo/container/handler/threadpool/ContainerThreadPool.java
new file mode 100644
index 00000000000..0f3be65f85f
--- /dev/null
+++ b/container-core/src/main/java/com/yahoo/container/handler/threadpool/ContainerThreadPool.java
@@ -0,0 +1,87 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.container.handler.threadpool;
+
+import com.google.inject.Inject;
+import com.yahoo.component.AbstractComponent;
+import com.yahoo.concurrent.ThreadFactoryFactory;
+import com.yahoo.container.handler.ThreadpoolConfig;
+import com.yahoo.container.protect.ProcessTerminator;
+import com.yahoo.jdisc.Metric;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A configurable thread pool. This provides the worker threads used for normal request processing.
+ *
+ * @author Steinar Knutsen
+ * @author baldersheim
+ * @author bratseth
+ * @author bjorncs
+ */
+public class ContainerThreadPool extends AbstractComponent implements AutoCloseable {
+
+ private final ExecutorServiceWrapper threadpool;
+
+ @Inject
+ public ContainerThreadPool(ThreadpoolConfig config, Metric metric) {
+ this(config, metric, new ProcessTerminator());
+ }
+
+ public ContainerThreadPool(ThreadpoolConfig threadpoolConfig, Metric metric, ProcessTerminator processTerminator) {
+ int maxNumThreads = computeThreadPoolSize(threadpoolConfig.maxthreads());
+ WorkerCompletionTimingThreadPoolExecutor executor =
+ new WorkerCompletionTimingThreadPoolExecutor(maxNumThreads, maxNumThreads,
+ 0L, TimeUnit.SECONDS,
+ createQ(threadpoolConfig.queueSize(), maxNumThreads),
+ ThreadFactoryFactory.getThreadFactory("threadpool"),
+ metric);
+ // Prestart needed, if not all threads will be created by the fist N tasks and hence they might also
+ // get the dreaded thread locals initialized even if they will never run.
+ // That counters what we we want to achieve with the Q that will prefer thread locality.
+ executor.prestartAllCoreThreads();
+ threadpool = new ExecutorServiceWrapper(executor, metric, processTerminator,
+ threadpoolConfig.maxThreadExecutionTimeSeconds() * 1000L);
+ }
+
+ public Executor executor() { return threadpool; }
+ @Override public void deconstruct() { closeInternal(); }
+ @Override public void close() { closeInternal(); }
+
+ /**
+ * Shutdown the thread pool, give a grace period of 1 second before forcibly
+ * shutting down all worker threads.
+ */
+ private void closeInternal() {
+ boolean terminated;
+
+ super.deconstruct();
+ threadpool.shutdown();
+ try {
+ terminated = threadpool.awaitTermination(1, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return;
+ }
+ if (!terminated) {
+ threadpool.shutdownNow();
+ }
+ }
+
+ private static BlockingQueue<Runnable> createQ(int queueSize, int maxThreads) {
+ return (queueSize == 0)
+ ? new SynchronousQueue<>(false)
+ : (queueSize < 0)
+ ? new ArrayBlockingQueue<>(maxThreads*4)
+ : new ArrayBlockingQueue<>(queueSize);
+ }
+
+ private static int computeThreadPoolSize(int maxNumThreads) {
+ return (maxNumThreads <= 0)
+ ? Runtime.getRuntime().availableProcessors() * 4
+ : maxNumThreads;
+ }
+}
diff --git a/container-core/src/main/java/com/yahoo/container/handler/threadpool/ExecutorServiceWrapper.java b/container-core/src/main/java/com/yahoo/container/handler/threadpool/ExecutorServiceWrapper.java
new file mode 100644
index 00000000000..f7b0a22120a
--- /dev/null
+++ b/container-core/src/main/java/com/yahoo/container/handler/threadpool/ExecutorServiceWrapper.java
@@ -0,0 +1,94 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.container.handler.threadpool;
+
+import com.google.common.util.concurrent.ForwardingExecutorService;
+import com.yahoo.container.protect.ProcessTerminator;
+import com.yahoo.jdisc.Metric;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * A service executor wrapper which emits metrics and
+ * shuts down the vm when no workers are available for too long to avoid containers lingering in a blocked state.
+ * Package private for testing
+ *
+ * @author Steinar Knutsen
+ * @author baldersheim
+ * @author bratseth
+ */
+class ExecutorServiceWrapper extends ForwardingExecutorService {
+
+ private final WorkerCompletionTimingThreadPoolExecutor wrapped;
+ private final Metric metric;
+ private final ProcessTerminator processTerminator;
+ private final long maxThreadExecutionTimeMillis;
+ private final Thread metricReporter;
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+
+ ExecutorServiceWrapper(
+ WorkerCompletionTimingThreadPoolExecutor wrapped,
+ Metric metric, ProcessTerminator processTerminator,
+ long maxThreadExecutionTimeMillis) {
+ this.wrapped = wrapped;
+ this.metric = metric;
+ this.processTerminator = processTerminator;
+ this.maxThreadExecutionTimeMillis = maxThreadExecutionTimeMillis;
+
+ metric.set(MetricNames.THREAD_POOL_SIZE, wrapped.getPoolSize(), null);
+ metric.set(MetricNames.ACTIVE_THREADS, wrapped.getActiveCount(), null);
+ metric.add(MetricNames.REJECTED_REQUEST, 0, null);
+ metricReporter = new Thread(this::reportMetrics);
+ metricReporter.setDaemon(true);
+ metricReporter.start();
+ }
+
+ private final void reportMetrics() {
+ try {
+ while (!closed.get()) {
+ metric.set(MetricNames.THREAD_POOL_SIZE, wrapped.getPoolSize(), null);
+ metric.set(MetricNames.ACTIVE_THREADS, wrapped.getActiveCount(), null);
+ Thread.sleep(100);
+ }
+ } catch (InterruptedException e) { }
+ }
+
+ @Override
+ public void shutdown() {
+ super.shutdown();
+ closed.set(true);
+ }
+
+ /**
+ * Tracks all instances of {@link RejectedExecutionException}.
+ * {@link ContainerThreadPool} returns an executor, so external uses will not
+ * have access to the methods declared by {@link ExecutorService}.
+ * ({@link Executor#execute(Runnable)} is declared by {@link Executor}.)
+ */
+ @Override
+ public void execute(Runnable command) {
+ try {
+ super.execute(command);
+ } catch (RejectedExecutionException e) {
+ metric.add(MetricNames.REJECTED_REQUEST, 1, null);
+ long timeSinceLastReturnedThreadMillis = System.currentTimeMillis() - wrapped.lastThreadAssignmentTimeMillis;
+ if (timeSinceLastReturnedThreadMillis > maxThreadExecutionTimeMillis)
+ processTerminator.logAndDie("No worker threads have been available for " +
+ timeSinceLastReturnedThreadMillis + " ms. Shutting down.", true);
+ throw e;
+ }
+ }
+
+ @Override
+ protected ExecutorService delegate() { return wrapped; }
+
+ private static final class MetricNames {
+ private static final String REJECTED_REQUEST = "serverRejectedRequests";
+ private static final String THREAD_POOL_SIZE = "serverThreadPoolSize";
+ private static final String ACTIVE_THREADS = "serverActiveThreads";
+ }
+
+}
+
diff --git a/container-core/src/main/java/com/yahoo/container/handler/threadpool/WorkerCompletionTimingThreadPoolExecutor.java b/container-core/src/main/java/com/yahoo/container/handler/threadpool/WorkerCompletionTimingThreadPoolExecutor.java
new file mode 100644
index 00000000000..9742e7ecfc3
--- /dev/null
+++ b/container-core/src/main/java/com/yahoo/container/handler/threadpool/WorkerCompletionTimingThreadPoolExecutor.java
@@ -0,0 +1,63 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.container.handler.threadpool;
+
+import com.yahoo.jdisc.Metric;
+
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * A thread pool executor which maintains the last time a worker completed
+ * package private for testing
+ *
+ * @author Steinar Knutsen
+ * @author baldersheim
+ * @author bratseth
+ */
+class WorkerCompletionTimingThreadPoolExecutor extends ThreadPoolExecutor {
+
+ private static final String UNHANDLED_EXCEPTIONS_METRIC = "jdisc.thread_pool.unhandled_exceptions";
+
+ volatile long lastThreadAssignmentTimeMillis = System.currentTimeMillis();
+ private final AtomicLong startedCount = new AtomicLong(0);
+ private final AtomicLong completedCount = new AtomicLong(0);
+ private final Metric metric;
+
+ WorkerCompletionTimingThreadPoolExecutor(
+ int corePoolSize,
+ int maximumPoolSize,
+ long keepAliveTime,
+ TimeUnit unit,
+ BlockingQueue<Runnable> workQueue,
+ ThreadFactory threadFactory,
+ Metric metric) {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
+ this.metric = metric;
+ }
+
+ @Override
+ protected void beforeExecute(Thread t, Runnable r) {
+ super.beforeExecute(t, r);
+ lastThreadAssignmentTimeMillis = System.currentTimeMillis();
+ startedCount.incrementAndGet();
+ }
+
+ @Override
+ protected void afterExecute(Runnable r, Throwable t) {
+ super.afterExecute(r, t);
+ completedCount.incrementAndGet();
+ if (t != null) {
+ metric.add(UNHANDLED_EXCEPTIONS_METRIC, 1L, metric.createContext(Map.of("exception", t.getClass().getSimpleName())));
+ }
+ }
+
+ @Override
+ public int getActiveCount() {
+ return (int)(startedCount.get() - completedCount.get());
+ }
+}
+
diff --git a/container-core/src/main/java/com/yahoo/container/handler/threadpool/package-info.java b/container-core/src/main/java/com/yahoo/container/handler/threadpool/package-info.java
new file mode 100644
index 00000000000..6a94cea49da
--- /dev/null
+++ b/container-core/src/main/java/com/yahoo/container/handler/threadpool/package-info.java
@@ -0,0 +1,8 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+/**
+ * @author bjorncs
+ */
+@ExportPackage
+package com.yahoo.container.handler.threadpool;
+
+import com.yahoo.osgi.annotation.ExportPackage; \ No newline at end of file
diff --git a/container-core/src/test/java/com/yahoo/container/handler/ThreadPoolProviderTestCase.java b/container-core/src/test/java/com/yahoo/container/handler/threadpool/ContainerThreadPoolTest.java
index 761ed40763c..7998bbc4872 100644
--- a/container-core/src/test/java/com/yahoo/container/handler/ThreadPoolProviderTestCase.java
+++ b/container-core/src/test/java/com/yahoo/container/handler/threadpool/ContainerThreadPoolTest.java
@@ -1,37 +1,33 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.container.handler;
-
-import static org.junit.Assert.fail;
-
-import java.util.concurrent.Executor;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ThreadPoolExecutor;
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.container.handler.threadpool;
+import com.yahoo.collections.Tuple2;
+import com.yahoo.concurrent.Receiver;
+import com.yahoo.container.handler.ThreadpoolConfig;
import com.yahoo.container.protect.ProcessTerminator;
+import com.yahoo.jdisc.Metric;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.Mockito;
-import com.yahoo.concurrent.Receiver;
-import com.yahoo.concurrent.Receiver.MessageState;
-import com.yahoo.collections.Tuple2;
-import com.yahoo.jdisc.Metric;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
/**
- * Check threadpool provider accepts tasks and shuts down properly.
- *
- * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a>
+ * @author Steinar Knutsen
+ * @author bjorncs
*/
-public class ThreadPoolProviderTestCase {
-
+public class ContainerThreadPoolTest {
@Test
- public final void testThreadPoolProvider() throws InterruptedException {
+ public final void testThreadPool() throws InterruptedException {
ThreadpoolConfig config = new ThreadpoolConfig(new ThreadpoolConfig.Builder().maxthreads(1));
- ThreadPoolProvider provider = new ThreadPoolProvider(config, Mockito.mock(Metric.class));
- Executor exec = provider.get();
- Tuple2<MessageState, Boolean> reply;
+ ContainerThreadPool threadPool = new ContainerThreadPool(config, Mockito.mock(Metric.class));
+ Executor exec = threadPool.executor();
+ Tuple2<Receiver.MessageState, Boolean> reply;
FlipIt command = new FlipIt();
for (boolean done = false; !done;) {
try {
@@ -42,13 +38,13 @@ public class ThreadPoolProviderTestCase {
}
}
reply = command.didItRun.get(5 * 60 * 1000);
- if (reply.first != MessageState.VALID) {
+ if (reply.first != Receiver.MessageState.VALID) {
fail("Executor task probably timed out, five minutes should be enough to flip a boolean.");
}
if (reply.second != Boolean.TRUE) {
fail("Executor task seemed to run, but did not get correct value.");
}
- provider.deconstruct();
+ threadPool.deconstruct();
command = new FlipIt();
try {
exec.execute(command);
@@ -61,9 +57,9 @@ public class ThreadPoolProviderTestCase {
private ThreadPoolExecutor createPool(int maxThreads, int queueSize) {
ThreadpoolConfig config = new ThreadpoolConfig(new ThreadpoolConfig.Builder().maxthreads(maxThreads).queueSize(queueSize));
- ThreadPoolProvider provider = new ThreadPoolProvider(config, Mockito.mock(Metric.class));
- ThreadPoolProvider.ExecutorServiceWrapper wrapper = (ThreadPoolProvider.ExecutorServiceWrapper) provider.get();
- ThreadPoolProvider.WorkerCompletionTimingThreadPoolExecutor executor = (ThreadPoolProvider.WorkerCompletionTimingThreadPoolExecutor)wrapper.delegate();
+ ContainerThreadPool threadPool = new ContainerThreadPool(config, Mockito.mock(Metric.class));
+ ExecutorServiceWrapper wrapper = (ExecutorServiceWrapper) threadPool.executor();
+ WorkerCompletionTimingThreadPoolExecutor executor = (WorkerCompletionTimingThreadPoolExecutor)wrapper.delegate();
return executor;
}
@@ -103,37 +99,37 @@ public class ThreadPoolProviderTestCase {
@Test
@Ignore // Ignored because it depends on the system time and so is unstable on factory
- public void testThreadPoolProviderTerminationOnBreakdown() throws InterruptedException {
+ public void testThreadPoolTerminationOnBreakdown() throws InterruptedException {
ThreadpoolConfig config = new ThreadpoolConfig(new ThreadpoolConfig.Builder().maxthreads(2)
- .maxThreadExecutionTimeSeconds(1));
+ .maxThreadExecutionTimeSeconds(1));
MockProcessTerminator terminator = new MockProcessTerminator();
- ThreadPoolProvider provider = new ThreadPoolProvider(config, Mockito.mock(Metric.class), terminator);
+ ContainerThreadPool threadPool = new ContainerThreadPool(config, Mockito.mock(Metric.class), terminator);
// No dying when threads hang shorter than max thread execution time
- provider.get().execute(new Hang(500));
- provider.get().execute(new Hang(500));
+ threadPool.executor().execute(new Hang(500));
+ threadPool.executor().execute(new Hang(500));
assertEquals(0, terminator.dieRequests);
- assertRejected(provider, new Hang(500)); // no more threads
+ assertRejected(threadPool, new Hang(500)); // no more threads
assertEquals(0, terminator.dieRequests); // ... but not for long enough yet
try { Thread.sleep(1500); } catch (InterruptedException e) {}
- provider.get().execute(new Hang(1));
+ threadPool.executor().execute(new Hang(1));
assertEquals(0, terminator.dieRequests);
try { Thread.sleep(50); } catch (InterruptedException e) {} // Make sure both threads are available
// Dying when hanging both thread pool threads for longer than max thread execution time
- provider.get().execute(new Hang(2000));
- provider.get().execute(new Hang(2000));
+ threadPool.executor().execute(new Hang(2000));
+ threadPool.executor().execute(new Hang(2000));
assertEquals(0, terminator.dieRequests);
- assertRejected(provider, new Hang(2000)); // no more threads
+ assertRejected(threadPool, new Hang(2000)); // no more threads
assertEquals(0, terminator.dieRequests); // ... but not for long enough yet
try { Thread.sleep(1500); } catch (InterruptedException e) {}
- assertRejected(provider, new Hang(2000)); // no more threads
+ assertRejected(threadPool, new Hang(2000)); // no more threads
assertEquals(1, terminator.dieRequests); // ... for longer than maxThreadExecutionTime
}
- private void assertRejected(ThreadPoolProvider provider, Runnable task) {
+ private void assertRejected(ContainerThreadPool threadPool, Runnable task) {
try {
- provider.get().execute(task);
+ threadPool.executor().execute(task);
fail("Expected execution rejected");
} catch (final RejectedExecutionException expected) {
}
@@ -165,4 +161,4 @@ public class ThreadPoolProviderTestCase {
}
-}
+} \ No newline at end of file