diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-02-05 22:13:47 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-02-05 22:13:47 +0100 |
commit | 01e534a73f18514a5e924d2a9d8220272ad37521 (patch) | |
tree | 0d52a1144de0e07be44b9665c59a5d14e485af5b | |
parent | 0553868014994ced17c7da44b5ae133f7b03a1f9 (diff) | |
parent | 639ddc82c6f7769b5d23aa57df8877f9a2354741 (diff) |
Merge pull request #12077 from vespa-engine/balder/use-multiple-connect-threads
Use a cached threadpool for execution of connect. To avoid failing co…
-rw-r--r-- | jrt/src/com/yahoo/jrt/Connector.java | 72 | ||||
-rw-r--r-- | jrt/src/com/yahoo/jrt/Transport.java | 5 |
2 files changed, 22 insertions, 55 deletions
diff --git a/jrt/src/com/yahoo/jrt/Connector.java b/jrt/src/com/yahoo/jrt/Connector.java index 4c83a2884bd..0fdb6cfa380 100644 --- a/jrt/src/com/yahoo/jrt/Connector.java +++ b/jrt/src/com/yahoo/jrt/Connector.java @@ -1,75 +1,43 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.jrt; +import com.yahoo.concurrent.ThreadFactoryFactory; -class Connector { +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; - private class Run implements Runnable { - public void run() { - try { - Connector.this.run(); - } catch (Throwable problem) { - parent.handleFailure(problem, Connector.this); - } - } - } - - private Thread thread = new Thread(new Run(), "<jrt-connector>"); - private Transport parent; - private ThreadQueue connectQueue = new ThreadQueue(); - private boolean done = false; - private boolean exit = false; +class Connector { - public Connector(Transport parent) { - this.parent = parent; - thread.setDaemon(true); - thread.start(); - } + private final ExecutorService executor = new ThreadPoolExecutor(1, 8, 10L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(), + ThreadFactoryFactory.getDaemonThreadFactory("jrt.connector")); - public void connectLater(Connection c) { - if ( ! connectQueue.enqueue(c)) { - c.transportThread().addConnection(c); - } + private void connect(Connection conn) { + conn.transportThread().addConnection(conn.connect()); } - private void run() { + public void connectLater(Connection conn) { try { - while (true) { - Connection conn = (Connection) connectQueue.dequeue(); - conn.transportThread().addConnection(conn.connect()); - } - } catch (EndOfQueueException e) {} - synchronized (this) { - done = true; - notifyAll(); - while (!exit) { - try { wait(); } catch (InterruptedException x) {} - } + executor.execute(() -> connect(conn)); + } catch (RejectedExecutionException e) { + conn.transportThread().addConnection(conn); } } public Connector shutdown() { - connectQueue.close(); - return this; - } - - public synchronized void waitDone() { - while (!done) { - try { wait(); } catch (InterruptedException x) {} - } - } - - public synchronized Connector exit() { - exit = true; - notifyAll(); + executor.shutdown(); return this; } public void join() { while (true) { try { - thread.join(); - return; + if (executor.awaitTermination(60, TimeUnit.SECONDS)) { + return; + } } catch (InterruptedException e) {} } } diff --git a/jrt/src/com/yahoo/jrt/Transport.java b/jrt/src/com/yahoo/jrt/Transport.java index f4eb1acd096..ad42409c48a 100644 --- a/jrt/src/com/yahoo/jrt/Transport.java +++ b/jrt/src/com/yahoo/jrt/Transport.java @@ -46,7 +46,7 @@ public class Transport { this.fatalHandler = fatalHandler; // NB: this must be set first } this.cryptoEngine = cryptoEngine; - connector = new Connector(this); + connector = new Connector(); worker = new Worker(this); runCnt = new AtomicInteger(numThreads); for (int i = 0; i < numThreads; ++i) { @@ -162,7 +162,7 @@ public class Transport { * @return this object, to enable chaining with join **/ public Transport shutdown() { - connector.shutdown().waitDone(); + connector.shutdown().join(); for (TransportThread thread: threads) { thread.shutdown(); } @@ -181,7 +181,6 @@ public class Transport { void notifyDone(TransportThread self) { if (runCnt.decrementAndGet() == 0) { worker.shutdown().join(); - connector.exit().join(); try { cryptoEngine.close(); } catch (Exception e) {} } } |