diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-02-25 12:07:42 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-02-25 12:07:42 +0000 |
commit | f17690d9533eef0f2471577184369548b0f3b380 (patch) | |
tree | e127fc3511babdb41f1b20cd45ccd17813e56ae2 | |
parent | fcb443e2aeaed0e592f8ba02402fb419202edc07 (diff) |
Adding more threads does not work when the Q is unbound. Use a SynchronousQ instead.
-rw-r--r-- | jrt/src/com/yahoo/jrt/Connector.java | 23 | ||||
-rw-r--r-- | jrt/src/com/yahoo/jrt/Supervisor.java | 31 | ||||
-rw-r--r-- | jrt/src/com/yahoo/jrt/Transport.java | 5 |
3 files changed, 19 insertions, 40 deletions
diff --git a/jrt/src/com/yahoo/jrt/Connector.java b/jrt/src/com/yahoo/jrt/Connector.java index 57fad5a163d..4f8fffecbac 100644 --- a/jrt/src/com/yahoo/jrt/Connector.java +++ b/jrt/src/com/yahoo/jrt/Connector.java @@ -4,15 +4,17 @@ package com.yahoo.jrt; import com.yahoo.concurrent.ThreadFactoryFactory; import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; class Connector { + private static final Logger log = Logger.getLogger(Connector.class.getName()); private final ExecutorService executor = new ThreadPoolExecutor(1, 64, 1L, TimeUnit.SECONDS, - new LinkedBlockingQueue<>(), + new SynchronousQueue<>(), ThreadFactoryFactory.getDaemonThreadFactory("jrt.connector")); private void connect(Connection conn) { @@ -20,11 +22,20 @@ class Connector { } public void connectLater(Connection conn) { - try { - executor.execute(() -> connect(conn)); - } catch (RejectedExecutionException e) { - conn.transportThread().addConnection(conn); + long delay = 1; + while (!executor.isShutdown()) { + try { + executor.execute(() -> connect(conn)); + return; + } catch (RejectedExecutionException ignored) { + log.warning("Failed posting connect task for " + conn + ". Trying again in " + delay + "ms."); + try { + Thread.sleep(delay); + } catch (InterruptedException silenced) {} + delay = Math.min(delay * 2, 100); + } } + conn.transportThread().addConnection(conn); } public Connector shutdown() { diff --git a/jrt/src/com/yahoo/jrt/Supervisor.java b/jrt/src/com/yahoo/jrt/Supervisor.java index 09360c2da7b..92c0208b493 100644 --- a/jrt/src/com/yahoo/jrt/Supervisor.java +++ b/jrt/src/com/yahoo/jrt/Supervisor.java @@ -1,7 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.jrt; - import java.util.HashMap; import java.util.concurrent.atomic.AtomicReference; @@ -103,19 +102,6 @@ public class Supervisor { } /** - * Remove a method from the set of methods held by this Supervisor - * - * @param methodName name of the method to remove - **/ - public void removeMethod(String methodName) { - synchronized (methodMapLock) { - HashMap<String, Method> newMap = new HashMap<>(methodMap()); - newMap.remove(methodName); - methodMap.setRelease(newMap); - } - } - - /** * Remove a method from the set of methods held by this * Supervisor. Use this if you know exactly which method to remove * and not only the name. @@ -169,23 +155,6 @@ public class Supervisor { } /** - * Convenience method for connecting to a peer, invoking a method - * and disconnecting. - * - * @param spec the address to connect to - * @param req the invocation request - * @param timeout request timeout in seconds - **/ - public void invokeBatch(Spec spec, Request req, double timeout) { - Target target = connect(spec); - try { - target.invokeSync(req, timeout); - } finally { - target.close(); - } - } - - /** * This method is invoked when a new target is created * * @param target the target diff --git a/jrt/src/com/yahoo/jrt/Transport.java b/jrt/src/com/yahoo/jrt/Transport.java index 6f5a381fd6b..f0a03362349 100644 --- a/jrt/src/com/yahoo/jrt/Transport.java +++ b/jrt/src/com/yahoo/jrt/Transport.java @@ -4,7 +4,6 @@ package com.yahoo.jrt; import java.nio.channels.SocketChannel; import java.util.ArrayList; -import java.util.Iterator; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; @@ -19,7 +18,7 @@ import java.util.logging.Logger; **/ public class Transport { - private static Logger log = Logger.getLogger(Transport.class.getName()); + private static final Logger log = Logger.getLogger(Transport.class.getName()); private final FatalErrorHandler fatalHandler; // NB: this must be set first private final CryptoEngine cryptoEngine; @@ -28,7 +27,7 @@ public class Transport { private final AtomicInteger runCnt; private final TransportMetrics metrics = TransportMetrics.getInstance(); - private final ArrayList<TransportThread> threads = new ArrayList<TransportThread>(); + private final ArrayList<TransportThread> threads = new ArrayList<>(); private final Random rnd = new Random(); /** |