aboutsummaryrefslogtreecommitdiffstats
path: root/jrt
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-02-25 12:07:42 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-02-25 12:07:42 +0000
commitf17690d9533eef0f2471577184369548b0f3b380 (patch)
treee127fc3511babdb41f1b20cd45ccd17813e56ae2 /jrt
parentfcb443e2aeaed0e592f8ba02402fb419202edc07 (diff)
Adding more threads does not work when the Q is unbound. Use a SynchronousQ instead.
Diffstat (limited to 'jrt')
-rw-r--r--jrt/src/com/yahoo/jrt/Connector.java23
-rw-r--r--jrt/src/com/yahoo/jrt/Supervisor.java31
-rw-r--r--jrt/src/com/yahoo/jrt/Transport.java5
3 files changed, 19 insertions, 40 deletions
diff --git a/jrt/src/com/yahoo/jrt/Connector.java b/jrt/src/com/yahoo/jrt/Connector.java
index 57fad5a163d..4f8fffecbac 100644
--- a/jrt/src/com/yahoo/jrt/Connector.java
+++ b/jrt/src/com/yahoo/jrt/Connector.java
@@ -4,15 +4,17 @@ package com.yahoo.jrt;
import com.yahoo.concurrent.ThreadFactoryFactory;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
class Connector {
+ private static final Logger log = Logger.getLogger(Connector.class.getName());
private final ExecutorService executor = new ThreadPoolExecutor(1, 64, 1L, TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(),
+ new SynchronousQueue<>(),
ThreadFactoryFactory.getDaemonThreadFactory("jrt.connector"));
private void connect(Connection conn) {
@@ -20,11 +22,20 @@ class Connector {
}
public void connectLater(Connection conn) {
- try {
- executor.execute(() -> connect(conn));
- } catch (RejectedExecutionException e) {
- conn.transportThread().addConnection(conn);
+ long delay = 1;
+ while (!executor.isShutdown()) {
+ try {
+ executor.execute(() -> connect(conn));
+ return;
+ } catch (RejectedExecutionException ignored) {
+ log.warning("Failed posting connect task for " + conn + ". Trying again in " + delay + "ms.");
+ try {
+ Thread.sleep(delay);
+ } catch (InterruptedException silenced) {}
+ delay = Math.min(delay * 2, 100);
+ }
}
+ conn.transportThread().addConnection(conn);
}
public Connector shutdown() {
diff --git a/jrt/src/com/yahoo/jrt/Supervisor.java b/jrt/src/com/yahoo/jrt/Supervisor.java
index 09360c2da7b..92c0208b493 100644
--- a/jrt/src/com/yahoo/jrt/Supervisor.java
+++ b/jrt/src/com/yahoo/jrt/Supervisor.java
@@ -1,7 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.jrt;
-
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicReference;
@@ -103,19 +102,6 @@ public class Supervisor {
}
/**
- * Remove a method from the set of methods held by this Supervisor
- *
- * @param methodName name of the method to remove
- **/
- public void removeMethod(String methodName) {
- synchronized (methodMapLock) {
- HashMap<String, Method> newMap = new HashMap<>(methodMap());
- newMap.remove(methodName);
- methodMap.setRelease(newMap);
- }
- }
-
- /**
* Remove a method from the set of methods held by this
* Supervisor. Use this if you know exactly which method to remove
* and not only the name.
@@ -169,23 +155,6 @@ public class Supervisor {
}
/**
- * Convenience method for connecting to a peer, invoking a method
- * and disconnecting.
- *
- * @param spec the address to connect to
- * @param req the invocation request
- * @param timeout request timeout in seconds
- **/
- public void invokeBatch(Spec spec, Request req, double timeout) {
- Target target = connect(spec);
- try {
- target.invokeSync(req, timeout);
- } finally {
- target.close();
- }
- }
-
- /**
* This method is invoked when a new target is created
*
* @param target the target
diff --git a/jrt/src/com/yahoo/jrt/Transport.java b/jrt/src/com/yahoo/jrt/Transport.java
index 6f5a381fd6b..f0a03362349 100644
--- a/jrt/src/com/yahoo/jrt/Transport.java
+++ b/jrt/src/com/yahoo/jrt/Transport.java
@@ -4,7 +4,6 @@ package com.yahoo.jrt;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
@@ -19,7 +18,7 @@ import java.util.logging.Logger;
**/
public class Transport {
- private static Logger log = Logger.getLogger(Transport.class.getName());
+ private static final Logger log = Logger.getLogger(Transport.class.getName());
private final FatalErrorHandler fatalHandler; // NB: this must be set first
private final CryptoEngine cryptoEngine;
@@ -28,7 +27,7 @@ public class Transport {
private final AtomicInteger runCnt;
private final TransportMetrics metrics = TransportMetrics.getInstance();
- private final ArrayList<TransportThread> threads = new ArrayList<TransportThread>();
+ private final ArrayList<TransportThread> threads = new ArrayList<>();
private final Random rnd = new Random();
/**