aboutsummaryrefslogtreecommitdiffstats
path: root/jrt
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-02-05 13:19:00 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-02-05 13:19:00 +0000
commitdd5388a31d85fdf2442db15b3ccfffef86392f67 (patch)
treeed3082e5e1586655778478349f1c9581b834ed55 /jrt
parent169bb2f57a915c7bb5586ce788dacda44de769f8 (diff)
Use a cached threadpool for execution of connect. To avoid failing connect blocking other connects.
Diffstat (limited to 'jrt')
-rw-r--r--jrt/src/com/yahoo/jrt/Connector.java59
1 files changed, 25 insertions, 34 deletions
diff --git a/jrt/src/com/yahoo/jrt/Connector.java b/jrt/src/com/yahoo/jrt/Connector.java
index 4c83a2884bd..61143e8a78b 100644
--- a/jrt/src/com/yahoo/jrt/Connector.java
+++ b/jrt/src/com/yahoo/jrt/Connector.java
@@ -1,55 +1,47 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.jrt;
+import com.yahoo.concurrent.ThreadFactoryFactory;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
class Connector {
- private class Run implements Runnable {
- public void run() {
- try {
- Connector.this.run();
- } catch (Throwable problem) {
- parent.handleFailure(problem, Connector.this);
- }
+ private final Transport parent;
+ private final ExecutorService executor = Executors.newCachedThreadPool(ThreadFactoryFactory.getDaemonThreadFactory("jrt.connector"));
+ private boolean done = false;
+
+ private void connect(Connection conn) {
+ try {
+ conn.transportThread().addConnection(conn.connect());
+ } catch (Throwable problem) {
+ parent.handleFailure(problem, Connector.this);
}
}
- private Thread thread = new Thread(new Run(), "<jrt-connector>");
- private Transport parent;
- private ThreadQueue connectQueue = new ThreadQueue();
- private boolean done = false;
- private boolean exit = false;
-
public Connector(Transport parent) {
this.parent = parent;
- thread.setDaemon(true);
- thread.start();
}
- public void connectLater(Connection c) {
- if ( ! connectQueue.enqueue(c)) {
- c.transportThread().addConnection(c);
+ public void connectLater(Connection conn) {
+ try {
+ executor.execute(() -> connect(conn));
+ } catch (RejectedExecutionException e) {
+ conn.transportThread().addConnection(conn);
}
+
}
- private void run() {
- try {
- while (true) {
- Connection conn = (Connection) connectQueue.dequeue();
- conn.transportThread().addConnection(conn.connect());
- }
- } catch (EndOfQueueException e) {}
+ public Connector shutdown() {
+ executor.shutdown();
+ join();
synchronized (this) {
done = true;
notifyAll();
- while (!exit) {
- try { wait(); } catch (InterruptedException x) {}
- }
}
- }
-
- public Connector shutdown() {
- connectQueue.close();
return this;
}
@@ -60,7 +52,6 @@ class Connector {
}
public synchronized Connector exit() {
- exit = true;
notifyAll();
return this;
}
@@ -68,7 +59,7 @@ class Connector {
public void join() {
while (true) {
try {
- thread.join();
+ executor.awaitTermination(60, TimeUnit.SECONDS);
return;
} catch (InterruptedException e) {}
}