summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java1
-rw-r--r--container-messagebus/src/main/resources/configdefinitions/container.jdisc.container-mbus.def3
-rw-r--r--jrt/src/com/yahoo/jrt/Transport.java27
-rw-r--r--jrt/src/com/yahoo/jrt/TransportThread.java19
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java3
-rwxr-xr-xmessagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetworkParams.java11
6 files changed, 48 insertions, 16 deletions
diff --git a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
index 68b1f5aa5db..7f67f7ec403 100644
--- a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
+++ b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
@@ -123,6 +123,7 @@ public final class SessionCache extends AbstractComponent {
.setListenPort(mbusConfig.port())
.setNumTargetsPerSpec(mbusConfig.numconnectionspertarget())
.setNumNetworkThreads(mbusConfig.numthreads())
+ .setTransportEventsBeforeWakeup(mbusConfig.transport_events_before_wakeup())
.setOptimization(RPCNetworkParams.Optimization.valueOf(mbusConfig.optimize_for().name()));
return SharedMessageBus.newInstance(mbusParams, netParams);
}
diff --git a/container-messagebus/src/main/resources/configdefinitions/container.jdisc.container-mbus.def b/container-messagebus/src/main/resources/configdefinitions/container.jdisc.container-mbus.def
index 9aef2b32a66..d3aa3bbdc4a 100644
--- a/container-messagebus/src/main/resources/configdefinitions/container.jdisc.container-mbus.def
+++ b/container-messagebus/src/main/resources/configdefinitions/container.jdisc.container-mbus.def
@@ -15,6 +15,9 @@ numthreads int default=2
# Optimize for latency, or throughput.
optimize_for enum {LATENCY, THROUGHPUT} default=LATENCY
+# Number of events before triggering wakeup of network thread.
+transport_events_before_wakeup int default=1
+
# Everying below is deprecated and will go away very soon.
# Dynamic throttling is used, and works better than anything else.
maxpendingcount int default=2048
diff --git a/jrt/src/com/yahoo/jrt/Transport.java b/jrt/src/com/yahoo/jrt/Transport.java
index 003e40b8aa9..2d2e653956d 100644
--- a/jrt/src/com/yahoo/jrt/Transport.java
+++ b/jrt/src/com/yahoo/jrt/Transport.java
@@ -27,6 +27,7 @@ public class Transport {
private final Worker worker;
private final AtomicInteger runCnt;
private final boolean tcpNoDelay;
+ private final int eventsBeforeWakeup;
private final TransportMetrics metrics = TransportMetrics.getInstance();
private final ArrayList<TransportThread> threads = new ArrayList<>();
@@ -42,12 +43,14 @@ public class Transport {
* @param fatalHandler fatal error handler
* @param cryptoEngine crypto engine to use
* @param numThreads number of {@link TransportThread}s.
+ * @param eventsBeforeWakeup number write events in Q before waking thread up
**/
- public Transport(String name, FatalErrorHandler fatalHandler, CryptoEngine cryptoEngine, int numThreads, boolean tcpNoDelay) {
+ public Transport(String name, FatalErrorHandler fatalHandler, CryptoEngine cryptoEngine, int numThreads, boolean tcpNoDelay, int eventsBeforeWakeup) {
this.name = name;
this.fatalHandler = fatalHandler; // NB: this must be set first
this.cryptoEngine = cryptoEngine;
this.tcpNoDelay = tcpNoDelay;
+ this.eventsBeforeWakeup = Math.max(1, eventsBeforeWakeup);
connector = new Connector();
worker = new Worker(this);
runCnt = new AtomicInteger(numThreads);
@@ -55,10 +58,23 @@ public class Transport {
threads.add(new TransportThread(this, i));
}
}
- public Transport(String name, CryptoEngine cryptoEngine, int numThreads) { this(name, null, cryptoEngine, numThreads, true); }
- public Transport(String name, int numThreads) { this(name, null, CryptoEngine.createDefault(), numThreads, true); }
- public Transport(String name, int numThreads, boolean tcpNoDelay) { this(name, null, CryptoEngine.createDefault(), numThreads, tcpNoDelay); }
- public Transport(String name) { this(name, null, CryptoEngine.createDefault(), 1, true); }
+ public Transport(String name, CryptoEngine cryptoEngine, int numThreads, int eventsBeforeWakeup) {
+ this(name, null, cryptoEngine, numThreads, true, eventsBeforeWakeup);
+ }
+ public Transport(String name, CryptoEngine cryptoEngine, int numThreads) {
+ this(name, null, cryptoEngine, numThreads, true, 1);
+ }
+ public Transport(String name, int numThreads, int eventsBeforeWakeup) {
+ this(name, null, CryptoEngine.createDefault(), numThreads, true, eventsBeforeWakeup);
+ }
+ public Transport(String name, int numThreads, boolean tcpNoDelay, int eventsBeforeWakeup) {
+ this(name, null, CryptoEngine.createDefault(), numThreads, tcpNoDelay, eventsBeforeWakeup); }
+ public Transport(String name, int numThreads) {
+ this(name, null, CryptoEngine.createDefault(), numThreads, true, 1);
+ }
+ public Transport(String name) {
+ this(name, null, CryptoEngine.createDefault(), 1, true, 1);
+ }
// Only for testing
public Transport() { this("default"); }
@@ -72,6 +88,7 @@ public class Transport {
}
boolean getTcpNoDelay() { return tcpNoDelay; }
+ int getEventsBeforeWakeup() { return eventsBeforeWakeup; }
String getName() { return name; }
diff --git a/jrt/src/com/yahoo/jrt/TransportThread.java b/jrt/src/com/yahoo/jrt/TransportThread.java
index 8f158161888..107e0490405 100644
--- a/jrt/src/com/yahoo/jrt/TransportThread.java
+++ b/jrt/src/com/yahoo/jrt/TransportThread.java
@@ -5,7 +5,6 @@ package com.yahoo.jrt;
import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
-import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -32,30 +31,30 @@ public class TransportThread {
}
private class AddConnectionCmd implements Runnable {
- private Connection conn;
+ private final Connection conn;
AddConnectionCmd(Connection conn) { this.conn = conn; }
public void run() { handleAddConnection(conn); }
}
private class CloseConnectionCmd implements Runnable {
- private Connection conn;
+ private final Connection conn;
CloseConnectionCmd(Connection conn) { this.conn = conn; }
public void run() { handleCloseConnection(conn); }
}
private class EnableWriteCmd implements Runnable {
- private Connection conn;
+ private final Connection conn;
EnableWriteCmd(Connection conn) { this.conn = conn; }
public void run() { handleEnableWrite(conn); }
}
private class HandshakeWorkDoneCmd implements Runnable {
- private Connection conn;
+ private final Connection conn;
HandshakeWorkDoneCmd(Connection conn) { this.conn = conn; }
public void run() { handleHandshakeWorkDone(conn); }
}
- private class SyncCmd implements Runnable {
+ private static class SyncCmd implements Runnable {
boolean done = false;
public synchronized void waitDone() {
while (!done) {
@@ -68,7 +67,7 @@ public class TransportThread {
}
}
- private static Logger log = Logger.getLogger(TransportThread.class.getName());
+ private static final Logger log = Logger.getLogger(TransportThread.class.getName());
private final Transport parent;
private final Thread thread;
@@ -120,15 +119,15 @@ public class TransportThread {
}
private boolean postCommand(Runnable cmd) {
- boolean wakeup;
+ int qlen;
synchronized (this) {
if (state == CLOSED) {
return false;
}
- wakeup = queue.isEmpty();
queue.enqueue(cmd);
+ qlen = queue.size();
}
- if (wakeup) {
+ if (qlen == parent.getEventsBeforeWakeup()) {
selector.wakeup();
}
return true;
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java
index b0724ad6029..1c41f87d1ee 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java
@@ -87,7 +87,8 @@ public class RPCNetwork implements Network, MethodHandler {
public RPCNetwork(RPCNetworkParams params, SlobrokConfigSubscriber slobrokConfig) {
this.slobroksConfig = slobrokConfig;
identity = params.getIdentity();
- orb = new Supervisor(new Transport("mbus-rpc-" + identity.getServicePrefix(), params.getNumNetworkThreads(), shouldEnableTcpNodelay(params.getOptimization())));
+ orb = new Supervisor(new Transport("mbus-rpc-" + identity.getServicePrefix(), params.getNumNetworkThreads(),
+ shouldEnableTcpNodelay(params.getOptimization()), params.getTransportEventsBeforeWakeup()));
orb.setMaxInputBufferSize(params.getMaxInputBufferSize());
orb.setMaxOutputBufferSize(params.getMaxOutputBufferSize());
targetPool = new RPCTargetPool(params.getConnectionExpireSecs(), params.getNumTargetsPerSpec());
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetworkParams.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetworkParams.java
index e77cddd8b06..db22363785d 100755
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetworkParams.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetworkParams.java
@@ -21,6 +21,8 @@ public class RPCNetworkParams {
private double connectionExpireSecs = 30;
private int numTargetsPerSpec = 1;
private int numNetworkThreads = 2;
+
+ private int transportEventsBeforeWakeup = 1;
public enum Optimization {LATENCY, THROUGHPUT}
Optimization optimization = Optimization.LATENCY;
@@ -216,4 +218,13 @@ public class RPCNetworkParams {
this.maxOutputBufferSize = maxOutputBufferSize;
return this;
}
+
+ public int getTransportEventsBeforeWakeup() {
+ return transportEventsBeforeWakeup;
+ }
+
+ public RPCNetworkParams setTransportEventsBeforeWakeup(int transportEventsBeforeWakeup) {
+ this.transportEventsBeforeWakeup = transportEventsBeforeWakeup;
+ return this;
+ }
}