diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-02-25 14:23:58 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-02-25 14:23:58 +0000 |
commit | d7d87a8e45e0dace606493dfed2f903a614b76f7 (patch) | |
tree | 15672623f840260a481309b912d363c74ee80c88 /jrt/src | |
parent | f17690d9533eef0f2471577184369548b0f3b380 (diff) |
Use a global bounded cached tread pool as primary, and a single threaded one for fallback.
Diffstat (limited to 'jrt/src')
-rw-r--r-- | jrt/src/com/yahoo/jrt/Connector.java | 116 | ||||
-rw-r--r-- | jrt/src/com/yahoo/jrt/Supervisor.java | 17 | ||||
-rw-r--r-- | jrt/src/com/yahoo/jrt/Transport.java | 2 |
3 files changed, 109 insertions, 26 deletions
diff --git a/jrt/src/com/yahoo/jrt/Connector.java b/jrt/src/com/yahoo/jrt/Connector.java index 4f8fffecbac..716801b4d0e 100644 --- a/jrt/src/com/yahoo/jrt/Connector.java +++ b/jrt/src/com/yahoo/jrt/Connector.java @@ -3,53 +3,119 @@ package com.yahoo.jrt; import com.yahoo.concurrent.ThreadFactoryFactory; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.logging.Logger; +import java.util.concurrent.atomic.AtomicReference; -class Connector { - private static final Logger log = Logger.getLogger(Connector.class.getName()); +class Connector implements AutoCloseable { - private final ExecutorService executor = new ThreadPoolExecutor(1, 64, 1L, TimeUnit.SECONDS, - new SynchronousQueue<>(), - ThreadFactoryFactory.getDaemonThreadFactory("jrt.connector")); + private static final Object globalLock = new Object(); + private static ExecutorService globalPrimaryExecutor = null; + private static ExecutorService globalFallbackExecutor = null; + private static long usages = 0; + + private static class ExecutorWithFallback implements Executor { + private final Executor primary; + private final Executor secondary; + ExecutorWithFallback(Executor primary, Executor secondary) { + this.primary = primary; + this.secondary = secondary; + } + + @Override + public void execute(Runnable command) { + try { + primary.execute(command); + } catch (RejectedExecutionException e1) { + secondary.execute(() -> retryForEver(command)); + } + } + private void retryForEver(Runnable command) { + while (true) { + try { + primary.execute(command); + return; + } catch (RejectedExecutionException rejected) { + try { + Thread.sleep(1); + } catch (InterruptedException silenced) { } + } + } + } + } + + private static ExecutorWithFallback acquire() { + synchronized (globalLock) { + if (globalPrimaryExecutor == null) { + globalPrimaryExecutor = new ThreadPoolExecutor(1, 64, 1L, TimeUnit.SECONDS, + new SynchronousQueue<>(), ThreadFactoryFactory.getDaemonThreadFactory("jrt.connector.primary")); + globalFallbackExecutor = Executors.newSingleThreadExecutor(ThreadFactoryFactory.getDaemonThreadFactory("jrt.connector.fallback")); + } + usages ++; + return new ExecutorWithFallback(globalPrimaryExecutor, globalFallbackExecutor); + } + } + + private static void release(ExecutorWithFallback executor) { + synchronized (globalLock) { + if (executor.primary != globalPrimaryExecutor) { + throw new IllegalStateException("primary executor " + executor.primary + " != " + globalPrimaryExecutor); + } + if (executor.secondary != globalFallbackExecutor) { + throw new IllegalStateException("secondary executor " + executor.secondary + " != " + globalFallbackExecutor); + } + usages--; + if (usages == 0) { + globalPrimaryExecutor.shutdown(); + globalFallbackExecutor.shutdown(); + while (true) { + try { + if (globalPrimaryExecutor != null && globalPrimaryExecutor.awaitTermination(60, TimeUnit.SECONDS)) { + globalPrimaryExecutor = null; + } + if (globalFallbackExecutor != null && globalFallbackExecutor.awaitTermination(60, TimeUnit.SECONDS)) { + globalFallbackExecutor = null; + } + if (globalFallbackExecutor == null && globalFallbackExecutor == null) { + return; + } + } catch (InterruptedException e) {} + } + } + } + } + + private final AtomicReference<ExecutorWithFallback> executor; + + Connector() { + executor = new AtomicReference<>(acquire()); + } private void connect(Connection conn) { conn.transportThread().addConnection(conn.connect()); } public void connectLater(Connection conn) { - long delay = 1; - while (!executor.isShutdown()) { + Executor executor = this.executor.get(); + if (executor != null) { try { executor.execute(() -> connect(conn)); return; } catch (RejectedExecutionException ignored) { - log.warning("Failed posting connect task for " + conn + ". Trying again in " + delay + "ms."); - try { - Thread.sleep(delay); - } catch (InterruptedException silenced) {} - delay = Math.min(delay * 2, 100); } } conn.transportThread().addConnection(conn); } - public Connector shutdown() { - executor.shutdown(); - return this; - } - - public void join() { - while (true) { - try { - if (executor.awaitTermination(60, TimeUnit.SECONDS)) { - return; - } - } catch (InterruptedException e) {} + public void close() { + ExecutorWithFallback toShutdown = executor.getAndSet(null); + if (toShutdown != null) { + release(toShutdown); } } } diff --git a/jrt/src/com/yahoo/jrt/Supervisor.java b/jrt/src/com/yahoo/jrt/Supervisor.java index 92c0208b493..d4168e97743 100644 --- a/jrt/src/com/yahoo/jrt/Supervisor.java +++ b/jrt/src/com/yahoo/jrt/Supervisor.java @@ -155,6 +155,23 @@ public class Supervisor { } /** + * Convenience method for connecting to a peer, invoking a method + * and disconnecting. + * + * @param spec the address to connect to + * @param req the invocation request + * @param timeout request timeout in seconds + **/ + public void invokeBatch(Spec spec, Request req, double timeout) { + Target target = connect(spec); + try { + target.invokeSync(req, timeout); + } finally { + target.close(); + } + } + + /** * This method is invoked when a new target is created * * @param target the target diff --git a/jrt/src/com/yahoo/jrt/Transport.java b/jrt/src/com/yahoo/jrt/Transport.java index f0a03362349..8abd3942a39 100644 --- a/jrt/src/com/yahoo/jrt/Transport.java +++ b/jrt/src/com/yahoo/jrt/Transport.java @@ -173,7 +173,7 @@ public class Transport { * @return this object, to enable chaining with join **/ public Transport shutdown() { - connector.shutdown().join(); + connector.close(); for (TransportThread thread: threads) { thread.shutdown(); } |