summaryrefslogtreecommitdiffstats
path: root/vespajlib
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-02-25 21:38:55 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-02-25 21:38:55 +0000
commitffa02ec79606ee8a48619cde15ad3a3e4bcbbd90 (patch)
tree6126d7ce1111fcd23a0a8020018a7e476232f010 /vespajlib
parent6c3f5eedc1d288cc1e5e71a2cbd7da02d5415016 (diff)
Add missing files.
Diffstat (limited to 'vespajlib')
-rw-r--r--vespajlib/src/main/java/com/yahoo/concurrent/CachedThreadPoolWithFallback.java63
-rw-r--r--vespajlib/src/test/java/com/yahoo/concurrent/CachedThreadPoolWithFallbackTest.java43
2 files changed, 106 insertions, 0 deletions
diff --git a/vespajlib/src/main/java/com/yahoo/concurrent/CachedThreadPoolWithFallback.java b/vespajlib/src/main/java/com/yahoo/concurrent/CachedThreadPoolWithFallback.java
new file mode 100644
index 00000000000..42e86aad1ba
--- /dev/null
+++ b/vespajlib/src/main/java/com/yahoo/concurrent/CachedThreadPoolWithFallback.java
@@ -0,0 +1,63 @@
+// Copyright 2020 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+package com.yahoo.concurrent;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An executor that will first try a bounded cached threadpool before falling back to a unbounded
+ * single threaded threadpool that will take over dispatching to the primary pool.
+ *
+ */
+public class CachedThreadPoolWithFallback implements AutoCloseable, Executor {
+ private final ExecutorService primary;
+ private final ExecutorService secondary;
+ public CachedThreadPoolWithFallback(String baseName, int corePoolSize, int maximumPoolSize, long keepAlimeTime, TimeUnit timeUnit) {
+ primary = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAlimeTime, timeUnit,
+ new SynchronousQueue<>(), ThreadFactoryFactory.getDaemonThreadFactory(baseName + ".primary"));
+ secondary = Executors.newSingleThreadExecutor(ThreadFactoryFactory.getDaemonThreadFactory(baseName + ".secondary"));
+ }
+ @Override
+ public void execute(Runnable command) {
+ try {
+ primary.execute(command);
+ } catch (RejectedExecutionException e1) {
+ secondary.execute(() -> retryForever(command));
+ }
+ }
+ private void retryForever(Runnable command) {
+ while (true) {
+ try {
+ primary.execute(command);
+ return;
+ } catch (RejectedExecutionException rejected) {
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException silenced) { }
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ secondary.shutdown();
+ join(secondary);
+ primary.shutdown();
+ join(primary);
+ }
+ private static void join(ExecutorService executor) {
+ while (true) {
+ try {
+ if (executor.awaitTermination(60, TimeUnit.SECONDS)) {
+ return;
+ }
+ } catch (InterruptedException e) {}
+ }
+ }
+}
diff --git a/vespajlib/src/test/java/com/yahoo/concurrent/CachedThreadPoolWithFallbackTest.java b/vespajlib/src/test/java/com/yahoo/concurrent/CachedThreadPoolWithFallbackTest.java
new file mode 100644
index 00000000000..52e17631a34
--- /dev/null
+++ b/vespajlib/src/test/java/com/yahoo/concurrent/CachedThreadPoolWithFallbackTest.java
@@ -0,0 +1,43 @@
+// Copyright 2020 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+package com.yahoo.concurrent;
+
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
+
+public class CachedThreadPoolWithFallbackTest {
+ private static void countAndBlock(AtomicLong counter, long waitLimit) {
+ counter.incrementAndGet();
+ try {
+ synchronized (counter) {
+ while (counter.get() < waitLimit) {
+ counter.wait();
+ }
+ }
+ } catch (InterruptedException e) {}
+ }
+
+ @Test
+ public void testThatTaskAreQueued() throws InterruptedException {
+ CachedThreadPoolWithFallback executor = new CachedThreadPoolWithFallback("test", 1, 30, 1, TimeUnit.SECONDS);
+ AtomicLong counter = new AtomicLong(0);
+ for (int i = 0; i < 1000; i++) {
+ executor.execute(() -> countAndBlock(counter, 100));
+ }
+ while (counter.get() < 30) {
+ Thread.sleep(1);
+ }
+ Thread.sleep(1);
+ assertEquals(30L, counter.get());
+ counter.set(100);
+ synchronized (counter) {
+ counter.notifyAll();
+ }
+ executor.close();
+ assertEquals(1070L, counter.get());
+ }
+}