diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-02-05 13:19:00 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-02-05 13:19:00 +0000 |
commit | dd5388a31d85fdf2442db15b3ccfffef86392f67 (patch) | |
tree | ed3082e5e1586655778478349f1c9581b834ed55 | |
parent | 169bb2f57a915c7bb5586ce788dacda44de769f8 (diff) |
Use a cached threadpool for execution of connect. To avoid failing connect blocking other connects.
-rw-r--r-- | jrt/src/com/yahoo/jrt/Connector.java | 59 |
1 files changed, 25 insertions, 34 deletions
diff --git a/jrt/src/com/yahoo/jrt/Connector.java b/jrt/src/com/yahoo/jrt/Connector.java index 4c83a2884bd..61143e8a78b 100644 --- a/jrt/src/com/yahoo/jrt/Connector.java +++ b/jrt/src/com/yahoo/jrt/Connector.java @@ -1,55 +1,47 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.jrt; +import com.yahoo.concurrent.ThreadFactoryFactory; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; class Connector { - private class Run implements Runnable { - public void run() { - try { - Connector.this.run(); - } catch (Throwable problem) { - parent.handleFailure(problem, Connector.this); - } + private final Transport parent; + private final ExecutorService executor = Executors.newCachedThreadPool(ThreadFactoryFactory.getDaemonThreadFactory("jrt.connector")); + private boolean done = false; + + private void connect(Connection conn) { + try { + conn.transportThread().addConnection(conn.connect()); + } catch (Throwable problem) { + parent.handleFailure(problem, Connector.this); } } - private Thread thread = new Thread(new Run(), "<jrt-connector>"); - private Transport parent; - private ThreadQueue connectQueue = new ThreadQueue(); - private boolean done = false; - private boolean exit = false; - public Connector(Transport parent) { this.parent = parent; - thread.setDaemon(true); - thread.start(); } - public void connectLater(Connection c) { - if ( ! connectQueue.enqueue(c)) { - c.transportThread().addConnection(c); + public void connectLater(Connection conn) { + try { + executor.execute(() -> connect(conn)); + } catch (RejectedExecutionException e) { + conn.transportThread().addConnection(conn); } + } - private void run() { - try { - while (true) { - Connection conn = (Connection) connectQueue.dequeue(); - conn.transportThread().addConnection(conn.connect()); - } - } catch (EndOfQueueException e) {} + public Connector shutdown() { + executor.shutdown(); + join(); synchronized (this) { done = true; notifyAll(); - while (!exit) { - try { wait(); } catch (InterruptedException x) {} - } } - } - - public Connector shutdown() { - connectQueue.close(); return this; } @@ -60,7 +52,6 @@ class Connector { } public synchronized Connector exit() { - exit = true; notifyAll(); return this; } @@ -68,7 +59,7 @@ class Connector { public void join() { while (true) { try { - thread.join(); + executor.awaitTermination(60, TimeUnit.SECONDS); return; } catch (InterruptedException e) {} } |