aboutsummaryrefslogtreecommitdiffstats
path: root/jrt
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-11-30 09:12:36 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-11-30 09:12:36 +0000
commitb54e1aecb0317e7381fbba262bcb29d55af85923 (patch)
tree567cba69faa3c401a5bfba7b3094eccd44c2ce24 /jrt
parent8001f2c4cf42344178cc4ecb0c695ef701b2bbce (diff)
A configurable limit for waking up fnet thread
Diffstat (limited to 'jrt')
-rw-r--r--jrt/src/com/yahoo/jrt/Transport.java27
-rw-r--r--jrt/src/com/yahoo/jrt/TransportThread.java6
2 files changed, 25 insertions, 8 deletions
diff --git a/jrt/src/com/yahoo/jrt/Transport.java b/jrt/src/com/yahoo/jrt/Transport.java
index 003e40b8aa9..71784703fc2 100644
--- a/jrt/src/com/yahoo/jrt/Transport.java
+++ b/jrt/src/com/yahoo/jrt/Transport.java
@@ -27,6 +27,7 @@ public class Transport {
private final Worker worker;
private final AtomicInteger runCnt;
private final boolean tcpNoDelay;
+ private final int wakeupTriggerCount;
private final TransportMetrics metrics = TransportMetrics.getInstance();
private final ArrayList<TransportThread> threads = new ArrayList<>();
@@ -42,12 +43,14 @@ public class Transport {
* @param fatalHandler fatal error handler
* @param cryptoEngine crypto engine to use
* @param numThreads number of {@link TransportThread}s.
+ * @param wakeupTriggerCount number write events in Q before waking thread up
**/
- public Transport(String name, FatalErrorHandler fatalHandler, CryptoEngine cryptoEngine, int numThreads, boolean tcpNoDelay) {
+ public Transport(String name, FatalErrorHandler fatalHandler, CryptoEngine cryptoEngine, int numThreads, boolean tcpNoDelay, int wakeupTriggerCount) {
this.name = name;
this.fatalHandler = fatalHandler; // NB: this must be set first
this.cryptoEngine = cryptoEngine;
this.tcpNoDelay = tcpNoDelay;
+ this.wakeupTriggerCount = Math.max(1, wakeupTriggerCount);
connector = new Connector();
worker = new Worker(this);
runCnt = new AtomicInteger(numThreads);
@@ -55,10 +58,23 @@ public class Transport {
threads.add(new TransportThread(this, i));
}
}
- public Transport(String name, CryptoEngine cryptoEngine, int numThreads) { this(name, null, cryptoEngine, numThreads, true); }
- public Transport(String name, int numThreads) { this(name, null, CryptoEngine.createDefault(), numThreads, true); }
- public Transport(String name, int numThreads, boolean tcpNoDelay) { this(name, null, CryptoEngine.createDefault(), numThreads, tcpNoDelay); }
- public Transport(String name) { this(name, null, CryptoEngine.createDefault(), 1, true); }
+ public Transport(String name, CryptoEngine cryptoEngine, int numThreads, int wakeupTriggerCount) {
+ this(name, null, cryptoEngine, numThreads, true, wakeupTriggerCount);
+ }
+ public Transport(String name, CryptoEngine cryptoEngine, int numThreads) {
+ this(name, null, cryptoEngine, numThreads, true, 1);
+ }
+ public Transport(String name, int numThreads, int wakeupTriggerCount) {
+ this(name, null, CryptoEngine.createDefault(), numThreads, true, wakeupTriggerCount);
+ }
+ public Transport(String name, int numThreads, boolean tcpNoDelay, int wakeupTriggerCount) {
+ this(name, null, CryptoEngine.createDefault(), numThreads, tcpNoDelay, wakeupTriggerCount); }
+ public Transport(String name, int numThreads) {
+ this(name, null, CryptoEngine.createDefault(), numThreads, true, 1);
+ }
+ public Transport(String name) {
+ this(name, null, CryptoEngine.createDefault(), 1, true, 1);
+ }
// Only for testing
public Transport() { this("default"); }
@@ -72,6 +88,7 @@ public class Transport {
}
boolean getTcpNoDelay() { return tcpNoDelay; }
+ int getWakeupTriggerCount() { return wakeupTriggerCount; }
String getName() { return name; }
diff --git a/jrt/src/com/yahoo/jrt/TransportThread.java b/jrt/src/com/yahoo/jrt/TransportThread.java
index 8f158161888..e42534d12fe 100644
--- a/jrt/src/com/yahoo/jrt/TransportThread.java
+++ b/jrt/src/com/yahoo/jrt/TransportThread.java
@@ -120,15 +120,15 @@ public class TransportThread {
}
private boolean postCommand(Runnable cmd) {
- boolean wakeup;
+ int qlen;
synchronized (this) {
if (state == CLOSED) {
return false;
}
- wakeup = queue.isEmpty();
queue.enqueue(cmd);
+ qlen = queue.size();
}
- if (wakeup) {
+ if (qlen == parent.getWakeupTriggerCount()) {
selector.wakeup();
}
return true;