diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-02-25 20:59:13 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-02-25 20:59:13 +0000 |
commit | 6c3f5eedc1d288cc1e5e71a2cbd7da02d5415016 (patch) | |
tree | c80e8d72c421cd2fa02abb5ce0456dd3fe43405f | |
parent | e8f646592fce73288f40bf854bf274e7dca0f475 (diff) |
Factor out generic thread pool to vespajlib.
-rw-r--r-- | jrt/src/com/yahoo/jrt/Connector.java | 88 |
1 files changed, 13 insertions, 75 deletions
diff --git a/jrt/src/com/yahoo/jrt/Connector.java b/jrt/src/com/yahoo/jrt/Connector.java index b0eeffda469..10f2b3742f2 100644 --- a/jrt/src/com/yahoo/jrt/Connector.java +++ b/jrt/src/com/yahoo/jrt/Connector.java @@ -1,104 +1,41 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.jrt; -import com.yahoo.concurrent.ThreadFactoryFactory; +import com.yahoo.concurrent.CachedThreadPoolWithFallback; import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; class Connector implements AutoCloseable { - //TODO Move to vespajlib as utility as java does not have what we want. private static final Object globalLock = new Object(); - private static ExecutorService globalPrimaryExecutor = null; - private static ExecutorService globalFallbackExecutor = null; + private static CachedThreadPoolWithFallback globalExecutor = null; private static long usages = 0; - private static class ExecutorWithFallback implements Executor { - private final Executor primary; - private final Executor secondary; - ExecutorWithFallback(Executor primary, Executor secondary) { - this.primary = primary; - this.secondary = secondary; - } - - @Override - public void execute(Runnable command) { - try { - primary.execute(command); - } catch (RejectedExecutionException e1) { - secondary.execute(() -> retryForever(command)); - } - } - private void retryForever(Runnable command) { - while (true) { - try { - primary.execute(command); - return; - } catch (RejectedExecutionException rejected) { - try { - Thread.sleep(1); - } catch (InterruptedException silenced) { } - } - } - } - } - - private static ExecutorWithFallback acquire() { + private static CachedThreadPoolWithFallback acquire() { synchronized (globalLock) { - if (globalPrimaryExecutor == null) { - if (globalFallbackExecutor != null) { - throw new IllegalStateException("fallback executor must be null !"); - } - if (usages != 0) { - throw new IllegalStateException("usages " + usages + " != 0"); - } - globalPrimaryExecutor = new ThreadPoolExecutor(1, 64, 1L, TimeUnit.SECONDS, - new SynchronousQueue<>(), ThreadFactoryFactory.getDaemonThreadFactory("jrt.connector.primary")); - globalFallbackExecutor = Executors.newSingleThreadExecutor(ThreadFactoryFactory.getDaemonThreadFactory("jrt.connector.fallback")); + if (globalExecutor == null) { + globalExecutor = new CachedThreadPoolWithFallback("jrt.connector", 1, 64, 1L, TimeUnit.SECONDS); } usages++; - return new ExecutorWithFallback(globalPrimaryExecutor, globalFallbackExecutor); + return globalExecutor; } } - private static void join(ExecutorService executor) { - while (true) { - try { - if (executor.awaitTermination(60, TimeUnit.SECONDS)) { - return; - } - } catch (InterruptedException e) {} - } - } - - private static void release(ExecutorWithFallback executor) { + private static void release(CachedThreadPoolWithFallback executor) { synchronized (globalLock) { - if (executor.primary != globalPrimaryExecutor) { - throw new IllegalStateException("primary executor " + executor.primary + " != " + globalPrimaryExecutor); - } - if (executor.secondary != globalFallbackExecutor) { - throw new IllegalStateException("secondary executor " + executor.secondary + " != " + globalFallbackExecutor); - } + assert executor == globalExecutor; usages--; if (usages == 0) { - globalPrimaryExecutor.shutdown(); - globalFallbackExecutor.shutdown(); - join(globalPrimaryExecutor); - globalPrimaryExecutor = null; - join(globalFallbackExecutor); - globalFallbackExecutor = null; + globalExecutor.close(); + globalExecutor = null; } } } - private final AtomicReference<ExecutorWithFallback> executor; + private final AtomicReference<CachedThreadPoolWithFallback> executor; Connector() { executor = new AtomicReference<>(acquire()); @@ -120,8 +57,9 @@ class Connector implements AutoCloseable { conn.transportThread().addConnection(conn); } + @Override public void close() { - ExecutorWithFallback toShutdown = executor.getAndSet(null); + CachedThreadPoolWithFallback toShutdown = executor.getAndSet(null); if (toShutdown != null) { release(toShutdown); } |