diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-11-30 09:12:36 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-11-30 09:12:36 +0000 |
commit | b54e1aecb0317e7381fbba262bcb29d55af85923 (patch) | |
tree | 567cba69faa3c401a5bfba7b3094eccd44c2ce24 /jrt/src | |
parent | 8001f2c4cf42344178cc4ecb0c695ef701b2bbce (diff) |
A configurable limit for waking up fnet thread
Diffstat (limited to 'jrt/src')
-rw-r--r-- | jrt/src/com/yahoo/jrt/Transport.java | 27 | ||||
-rw-r--r-- | jrt/src/com/yahoo/jrt/TransportThread.java | 6 |
2 files changed, 25 insertions, 8 deletions
diff --git a/jrt/src/com/yahoo/jrt/Transport.java b/jrt/src/com/yahoo/jrt/Transport.java index 003e40b8aa9..71784703fc2 100644 --- a/jrt/src/com/yahoo/jrt/Transport.java +++ b/jrt/src/com/yahoo/jrt/Transport.java @@ -27,6 +27,7 @@ public class Transport { private final Worker worker; private final AtomicInteger runCnt; private final boolean tcpNoDelay; + private final int wakeupTriggerCount; private final TransportMetrics metrics = TransportMetrics.getInstance(); private final ArrayList<TransportThread> threads = new ArrayList<>(); @@ -42,12 +43,14 @@ public class Transport { * @param fatalHandler fatal error handler * @param cryptoEngine crypto engine to use * @param numThreads number of {@link TransportThread}s. + * @param wakeupTriggerCount number write events in Q before waking thread up **/ - public Transport(String name, FatalErrorHandler fatalHandler, CryptoEngine cryptoEngine, int numThreads, boolean tcpNoDelay) { + public Transport(String name, FatalErrorHandler fatalHandler, CryptoEngine cryptoEngine, int numThreads, boolean tcpNoDelay, int wakeupTriggerCount) { this.name = name; this.fatalHandler = fatalHandler; // NB: this must be set first this.cryptoEngine = cryptoEngine; this.tcpNoDelay = tcpNoDelay; + this.wakeupTriggerCount = Math.max(1, wakeupTriggerCount); connector = new Connector(); worker = new Worker(this); runCnt = new AtomicInteger(numThreads); @@ -55,10 +58,23 @@ public class Transport { threads.add(new TransportThread(this, i)); } } - public Transport(String name, CryptoEngine cryptoEngine, int numThreads) { this(name, null, cryptoEngine, numThreads, true); } - public Transport(String name, int numThreads) { this(name, null, CryptoEngine.createDefault(), numThreads, true); } - public Transport(String name, int numThreads, boolean tcpNoDelay) { this(name, null, CryptoEngine.createDefault(), numThreads, tcpNoDelay); } - public Transport(String name) { this(name, null, CryptoEngine.createDefault(), 1, true); } + public Transport(String name, CryptoEngine cryptoEngine, int numThreads, int wakeupTriggerCount) { + this(name, null, cryptoEngine, numThreads, true, wakeupTriggerCount); + } + public Transport(String name, CryptoEngine cryptoEngine, int numThreads) { + this(name, null, cryptoEngine, numThreads, true, 1); + } + public Transport(String name, int numThreads, int wakeupTriggerCount) { + this(name, null, CryptoEngine.createDefault(), numThreads, true, wakeupTriggerCount); + } + public Transport(String name, int numThreads, boolean tcpNoDelay, int wakeupTriggerCount) { + this(name, null, CryptoEngine.createDefault(), numThreads, tcpNoDelay, wakeupTriggerCount); } + public Transport(String name, int numThreads) { + this(name, null, CryptoEngine.createDefault(), numThreads, true, 1); + } + public Transport(String name) { + this(name, null, CryptoEngine.createDefault(), 1, true, 1); + } // Only for testing public Transport() { this("default"); } @@ -72,6 +88,7 @@ public class Transport { } boolean getTcpNoDelay() { return tcpNoDelay; } + int getWakeupTriggerCount() { return wakeupTriggerCount; } String getName() { return name; } diff --git a/jrt/src/com/yahoo/jrt/TransportThread.java b/jrt/src/com/yahoo/jrt/TransportThread.java index 8f158161888..e42534d12fe 100644 --- a/jrt/src/com/yahoo/jrt/TransportThread.java +++ b/jrt/src/com/yahoo/jrt/TransportThread.java @@ -120,15 +120,15 @@ public class TransportThread { } private boolean postCommand(Runnable cmd) { - boolean wakeup; + int qlen; synchronized (this) { if (state == CLOSED) { return false; } - wakeup = queue.isEmpty(); queue.enqueue(cmd); + qlen = queue.size(); } - if (wakeup) { + if (qlen == parent.getWakeupTriggerCount()) { selector.wakeup(); } return true; |