summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-02-05 22:13:47 +0100
committerGitHub <noreply@github.com>2020-02-05 22:13:47 +0100
commit01e534a73f18514a5e924d2a9d8220272ad37521 (patch)
tree0d52a1144de0e07be44b9665c59a5d14e485af5b
parent0553868014994ced17c7da44b5ae133f7b03a1f9 (diff)
parent639ddc82c6f7769b5d23aa57df8877f9a2354741 (diff)
Merge pull request #12077 from vespa-engine/balder/use-multiple-connect-threads
Use a cached threadpool for execution of connect. To avoid failing co…
-rw-r--r--jrt/src/com/yahoo/jrt/Connector.java72
-rw-r--r--jrt/src/com/yahoo/jrt/Transport.java5
2 files changed, 22 insertions, 55 deletions
diff --git a/jrt/src/com/yahoo/jrt/Connector.java b/jrt/src/com/yahoo/jrt/Connector.java
index 4c83a2884bd..0fdb6cfa380 100644
--- a/jrt/src/com/yahoo/jrt/Connector.java
+++ b/jrt/src/com/yahoo/jrt/Connector.java
@@ -1,75 +1,43 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.jrt;
+import com.yahoo.concurrent.ThreadFactoryFactory;
-class Connector {
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
- private class Run implements Runnable {
- public void run() {
- try {
- Connector.this.run();
- } catch (Throwable problem) {
- parent.handleFailure(problem, Connector.this);
- }
- }
- }
-
- private Thread thread = new Thread(new Run(), "<jrt-connector>");
- private Transport parent;
- private ThreadQueue connectQueue = new ThreadQueue();
- private boolean done = false;
- private boolean exit = false;
+class Connector {
- public Connector(Transport parent) {
- this.parent = parent;
- thread.setDaemon(true);
- thread.start();
- }
+ private final ExecutorService executor = new ThreadPoolExecutor(1, 8, 10L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(),
+ ThreadFactoryFactory.getDaemonThreadFactory("jrt.connector"));
- public void connectLater(Connection c) {
- if ( ! connectQueue.enqueue(c)) {
- c.transportThread().addConnection(c);
- }
+ private void connect(Connection conn) {
+ conn.transportThread().addConnection(conn.connect());
}
- private void run() {
+ public void connectLater(Connection conn) {
try {
- while (true) {
- Connection conn = (Connection) connectQueue.dequeue();
- conn.transportThread().addConnection(conn.connect());
- }
- } catch (EndOfQueueException e) {}
- synchronized (this) {
- done = true;
- notifyAll();
- while (!exit) {
- try { wait(); } catch (InterruptedException x) {}
- }
+ executor.execute(() -> connect(conn));
+ } catch (RejectedExecutionException e) {
+ conn.transportThread().addConnection(conn);
}
}
public Connector shutdown() {
- connectQueue.close();
- return this;
- }
-
- public synchronized void waitDone() {
- while (!done) {
- try { wait(); } catch (InterruptedException x) {}
- }
- }
-
- public synchronized Connector exit() {
- exit = true;
- notifyAll();
+ executor.shutdown();
return this;
}
public void join() {
while (true) {
try {
- thread.join();
- return;
+ if (executor.awaitTermination(60, TimeUnit.SECONDS)) {
+ return;
+ }
} catch (InterruptedException e) {}
}
}
diff --git a/jrt/src/com/yahoo/jrt/Transport.java b/jrt/src/com/yahoo/jrt/Transport.java
index f4eb1acd096..ad42409c48a 100644
--- a/jrt/src/com/yahoo/jrt/Transport.java
+++ b/jrt/src/com/yahoo/jrt/Transport.java
@@ -46,7 +46,7 @@ public class Transport {
this.fatalHandler = fatalHandler; // NB: this must be set first
}
this.cryptoEngine = cryptoEngine;
- connector = new Connector(this);
+ connector = new Connector();
worker = new Worker(this);
runCnt = new AtomicInteger(numThreads);
for (int i = 0; i < numThreads; ++i) {
@@ -162,7 +162,7 @@ public class Transport {
* @return this object, to enable chaining with join
**/
public Transport shutdown() {
- connector.shutdown().waitDone();
+ connector.shutdown().join();
for (TransportThread thread: threads) {
thread.shutdown();
}
@@ -181,7 +181,6 @@ public class Transport {
void notifyDone(TransportThread self) {
if (runCnt.decrementAndGet() == 0) {
worker.shutdown().join();
- connector.exit().join();
try { cryptoEngine.close(); } catch (Exception e) {}
}
}