aboutsummaryrefslogtreecommitdiffstats
path: root/jrt
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-02-25 14:23:58 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-02-25 14:23:58 +0000
commitd7d87a8e45e0dace606493dfed2f903a614b76f7 (patch)
tree15672623f840260a481309b912d363c74ee80c88 /jrt
parentf17690d9533eef0f2471577184369548b0f3b380 (diff)
Use a global bounded cached tread pool as primary, and a single threaded one for fallback.
Diffstat (limited to 'jrt')
-rw-r--r--jrt/src/com/yahoo/jrt/Connector.java116
-rw-r--r--jrt/src/com/yahoo/jrt/Supervisor.java17
-rw-r--r--jrt/src/com/yahoo/jrt/Transport.java2
3 files changed, 109 insertions, 26 deletions
diff --git a/jrt/src/com/yahoo/jrt/Connector.java b/jrt/src/com/yahoo/jrt/Connector.java
index 4f8fffecbac..716801b4d0e 100644
--- a/jrt/src/com/yahoo/jrt/Connector.java
+++ b/jrt/src/com/yahoo/jrt/Connector.java
@@ -3,53 +3,119 @@ package com.yahoo.jrt;
import com.yahoo.concurrent.ThreadFactoryFactory;
+import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.logging.Logger;
+import java.util.concurrent.atomic.AtomicReference;
-class Connector {
- private static final Logger log = Logger.getLogger(Connector.class.getName());
+class Connector implements AutoCloseable {
- private final ExecutorService executor = new ThreadPoolExecutor(1, 64, 1L, TimeUnit.SECONDS,
- new SynchronousQueue<>(),
- ThreadFactoryFactory.getDaemonThreadFactory("jrt.connector"));
+ private static final Object globalLock = new Object();
+ private static ExecutorService globalPrimaryExecutor = null;
+ private static ExecutorService globalFallbackExecutor = null;
+ private static long usages = 0;
+
+ private static class ExecutorWithFallback implements Executor {
+ private final Executor primary;
+ private final Executor secondary;
+ ExecutorWithFallback(Executor primary, Executor secondary) {
+ this.primary = primary;
+ this.secondary = secondary;
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ try {
+ primary.execute(command);
+ } catch (RejectedExecutionException e1) {
+ secondary.execute(() -> retryForEver(command));
+ }
+ }
+ private void retryForEver(Runnable command) {
+ while (true) {
+ try {
+ primary.execute(command);
+ return;
+ } catch (RejectedExecutionException rejected) {
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException silenced) { }
+ }
+ }
+ }
+ }
+
+ private static ExecutorWithFallback acquire() {
+ synchronized (globalLock) {
+ if (globalPrimaryExecutor == null) {
+ globalPrimaryExecutor = new ThreadPoolExecutor(1, 64, 1L, TimeUnit.SECONDS,
+ new SynchronousQueue<>(), ThreadFactoryFactory.getDaemonThreadFactory("jrt.connector.primary"));
+ globalFallbackExecutor = Executors.newSingleThreadExecutor(ThreadFactoryFactory.getDaemonThreadFactory("jrt.connector.fallback"));
+ }
+ usages ++;
+ return new ExecutorWithFallback(globalPrimaryExecutor, globalFallbackExecutor);
+ }
+ }
+
+ private static void release(ExecutorWithFallback executor) {
+ synchronized (globalLock) {
+ if (executor.primary != globalPrimaryExecutor) {
+ throw new IllegalStateException("primary executor " + executor.primary + " != " + globalPrimaryExecutor);
+ }
+ if (executor.secondary != globalFallbackExecutor) {
+ throw new IllegalStateException("secondary executor " + executor.secondary + " != " + globalFallbackExecutor);
+ }
+ usages--;
+ if (usages == 0) {
+ globalPrimaryExecutor.shutdown();
+ globalFallbackExecutor.shutdown();
+ while (true) {
+ try {
+ if (globalPrimaryExecutor != null && globalPrimaryExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
+ globalPrimaryExecutor = null;
+ }
+ if (globalFallbackExecutor != null && globalFallbackExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
+ globalFallbackExecutor = null;
+ }
+ if (globalFallbackExecutor == null && globalFallbackExecutor == null) {
+ return;
+ }
+ } catch (InterruptedException e) {}
+ }
+ }
+ }
+ }
+
+ private final AtomicReference<ExecutorWithFallback> executor;
+
+ Connector() {
+ executor = new AtomicReference<>(acquire());
+ }
private void connect(Connection conn) {
conn.transportThread().addConnection(conn.connect());
}
public void connectLater(Connection conn) {
- long delay = 1;
- while (!executor.isShutdown()) {
+ Executor executor = this.executor.get();
+ if (executor != null) {
try {
executor.execute(() -> connect(conn));
return;
} catch (RejectedExecutionException ignored) {
- log.warning("Failed posting connect task for " + conn + ". Trying again in " + delay + "ms.");
- try {
- Thread.sleep(delay);
- } catch (InterruptedException silenced) {}
- delay = Math.min(delay * 2, 100);
}
}
conn.transportThread().addConnection(conn);
}
- public Connector shutdown() {
- executor.shutdown();
- return this;
- }
-
- public void join() {
- while (true) {
- try {
- if (executor.awaitTermination(60, TimeUnit.SECONDS)) {
- return;
- }
- } catch (InterruptedException e) {}
+ public void close() {
+ ExecutorWithFallback toShutdown = executor.getAndSet(null);
+ if (toShutdown != null) {
+ release(toShutdown);
}
}
}
diff --git a/jrt/src/com/yahoo/jrt/Supervisor.java b/jrt/src/com/yahoo/jrt/Supervisor.java
index 92c0208b493..d4168e97743 100644
--- a/jrt/src/com/yahoo/jrt/Supervisor.java
+++ b/jrt/src/com/yahoo/jrt/Supervisor.java
@@ -155,6 +155,23 @@ public class Supervisor {
}
/**
+ * Convenience method for connecting to a peer, invoking a method
+ * and disconnecting.
+ *
+ * @param spec the address to connect to
+ * @param req the invocation request
+ * @param timeout request timeout in seconds
+ **/
+ public void invokeBatch(Spec spec, Request req, double timeout) {
+ Target target = connect(spec);
+ try {
+ target.invokeSync(req, timeout);
+ } finally {
+ target.close();
+ }
+ }
+
+ /**
* This method is invoked when a new target is created
*
* @param target the target
diff --git a/jrt/src/com/yahoo/jrt/Transport.java b/jrt/src/com/yahoo/jrt/Transport.java
index f0a03362349..8abd3942a39 100644
--- a/jrt/src/com/yahoo/jrt/Transport.java
+++ b/jrt/src/com/yahoo/jrt/Transport.java
@@ -173,7 +173,7 @@ public class Transport {
* @return this object, to enable chaining with join
**/
public Transport shutdown() {
- connector.shutdown().join();
+ connector.close();
for (TransportThread thread: threads) {
thread.shutdown();
}