diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-02-25 21:38:55 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-02-25 21:38:55 +0000 |
commit | ffa02ec79606ee8a48619cde15ad3a3e4bcbbd90 (patch) | |
tree | 6126d7ce1111fcd23a0a8020018a7e476232f010 /vespajlib/src | |
parent | 6c3f5eedc1d288cc1e5e71a2cbd7da02d5415016 (diff) |
Add missing files.
Diffstat (limited to 'vespajlib/src')
-rw-r--r-- | vespajlib/src/main/java/com/yahoo/concurrent/CachedThreadPoolWithFallback.java | 63 | ||||
-rw-r--r-- | vespajlib/src/test/java/com/yahoo/concurrent/CachedThreadPoolWithFallbackTest.java | 43 |
2 files changed, 106 insertions, 0 deletions
diff --git a/vespajlib/src/main/java/com/yahoo/concurrent/CachedThreadPoolWithFallback.java b/vespajlib/src/main/java/com/yahoo/concurrent/CachedThreadPoolWithFallback.java new file mode 100644 index 00000000000..42e86aad1ba --- /dev/null +++ b/vespajlib/src/main/java/com/yahoo/concurrent/CachedThreadPoolWithFallback.java @@ -0,0 +1,63 @@ +// Copyright 2020 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +package com.yahoo.concurrent; + +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * An executor that will first try a bounded cached threadpool before falling back to a unbounded + * single threaded threadpool that will take over dispatching to the primary pool. + * + */ +public class CachedThreadPoolWithFallback implements AutoCloseable, Executor { + private final ExecutorService primary; + private final ExecutorService secondary; + public CachedThreadPoolWithFallback(String baseName, int corePoolSize, int maximumPoolSize, long keepAlimeTime, TimeUnit timeUnit) { + primary = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAlimeTime, timeUnit, + new SynchronousQueue<>(), ThreadFactoryFactory.getDaemonThreadFactory(baseName + ".primary")); + secondary = Executors.newSingleThreadExecutor(ThreadFactoryFactory.getDaemonThreadFactory(baseName + ".secondary")); + } + @Override + public void execute(Runnable command) { + try { + primary.execute(command); + } catch (RejectedExecutionException e1) { + secondary.execute(() -> retryForever(command)); + } + } + private void retryForever(Runnable command) { + while (true) { + try { + primary.execute(command); + return; + } catch (RejectedExecutionException rejected) { + try { + Thread.sleep(1); + } catch (InterruptedException silenced) { } + } + } + } + + @Override + public void close() { + secondary.shutdown(); + join(secondary); + primary.shutdown(); + join(primary); + } + private static void join(ExecutorService executor) { + while (true) { + try { + if (executor.awaitTermination(60, TimeUnit.SECONDS)) { + return; + } + } catch (InterruptedException e) {} + } + } +} diff --git a/vespajlib/src/test/java/com/yahoo/concurrent/CachedThreadPoolWithFallbackTest.java b/vespajlib/src/test/java/com/yahoo/concurrent/CachedThreadPoolWithFallbackTest.java new file mode 100644 index 00000000000..52e17631a34 --- /dev/null +++ b/vespajlib/src/test/java/com/yahoo/concurrent/CachedThreadPoolWithFallbackTest.java @@ -0,0 +1,43 @@ +// Copyright 2020 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +package com.yahoo.concurrent; + +import org.junit.Test; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import static org.junit.Assert.assertEquals; + +public class CachedThreadPoolWithFallbackTest { + private static void countAndBlock(AtomicLong counter, long waitLimit) { + counter.incrementAndGet(); + try { + synchronized (counter) { + while (counter.get() < waitLimit) { + counter.wait(); + } + } + } catch (InterruptedException e) {} + } + + @Test + public void testThatTaskAreQueued() throws InterruptedException { + CachedThreadPoolWithFallback executor = new CachedThreadPoolWithFallback("test", 1, 30, 1, TimeUnit.SECONDS); + AtomicLong counter = new AtomicLong(0); + for (int i = 0; i < 1000; i++) { + executor.execute(() -> countAndBlock(counter, 100)); + } + while (counter.get() < 30) { + Thread.sleep(1); + } + Thread.sleep(1); + assertEquals(30L, counter.get()); + counter.set(100); + synchronized (counter) { + counter.notifyAll(); + } + executor.close(); + assertEquals(1070L, counter.get()); + } +} |