diff options
6 files changed, 48 insertions, 16 deletions
diff --git a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java index 68b1f5aa5db..7f67f7ec403 100644 --- a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java +++ b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java @@ -123,6 +123,7 @@ public final class SessionCache extends AbstractComponent { .setListenPort(mbusConfig.port()) .setNumTargetsPerSpec(mbusConfig.numconnectionspertarget()) .setNumNetworkThreads(mbusConfig.numthreads()) + .setTransportEventsBeforeWakeup(mbusConfig.transport_events_before_wakeup()) .setOptimization(RPCNetworkParams.Optimization.valueOf(mbusConfig.optimize_for().name())); return SharedMessageBus.newInstance(mbusParams, netParams); } diff --git a/container-messagebus/src/main/resources/configdefinitions/container.jdisc.container-mbus.def b/container-messagebus/src/main/resources/configdefinitions/container.jdisc.container-mbus.def index 9aef2b32a66..d3aa3bbdc4a 100644 --- a/container-messagebus/src/main/resources/configdefinitions/container.jdisc.container-mbus.def +++ b/container-messagebus/src/main/resources/configdefinitions/container.jdisc.container-mbus.def @@ -15,6 +15,9 @@ numthreads int default=2 # Optimize for latency, or throughput. optimize_for enum {LATENCY, THROUGHPUT} default=LATENCY +# Number of events before triggering wakeup of network thread. +transport_events_before_wakeup int default=1 + # Everying below is deprecated and will go away very soon. # Dynamic throttling is used, and works better than anything else. maxpendingcount int default=2048 diff --git a/jrt/src/com/yahoo/jrt/Transport.java b/jrt/src/com/yahoo/jrt/Transport.java index 003e40b8aa9..2d2e653956d 100644 --- a/jrt/src/com/yahoo/jrt/Transport.java +++ b/jrt/src/com/yahoo/jrt/Transport.java @@ -27,6 +27,7 @@ public class Transport { private final Worker worker; private final AtomicInteger runCnt; private final boolean tcpNoDelay; + private final int eventsBeforeWakeup; private final TransportMetrics metrics = TransportMetrics.getInstance(); private final ArrayList<TransportThread> threads = new ArrayList<>(); @@ -42,12 +43,14 @@ public class Transport { * @param fatalHandler fatal error handler * @param cryptoEngine crypto engine to use * @param numThreads number of {@link TransportThread}s. + * @param eventsBeforeWakeup number write events in Q before waking thread up **/ - public Transport(String name, FatalErrorHandler fatalHandler, CryptoEngine cryptoEngine, int numThreads, boolean tcpNoDelay) { + public Transport(String name, FatalErrorHandler fatalHandler, CryptoEngine cryptoEngine, int numThreads, boolean tcpNoDelay, int eventsBeforeWakeup) { this.name = name; this.fatalHandler = fatalHandler; // NB: this must be set first this.cryptoEngine = cryptoEngine; this.tcpNoDelay = tcpNoDelay; + this.eventsBeforeWakeup = Math.max(1, eventsBeforeWakeup); connector = new Connector(); worker = new Worker(this); runCnt = new AtomicInteger(numThreads); @@ -55,10 +58,23 @@ public class Transport { threads.add(new TransportThread(this, i)); } } - public Transport(String name, CryptoEngine cryptoEngine, int numThreads) { this(name, null, cryptoEngine, numThreads, true); } - public Transport(String name, int numThreads) { this(name, null, CryptoEngine.createDefault(), numThreads, true); } - public Transport(String name, int numThreads, boolean tcpNoDelay) { this(name, null, CryptoEngine.createDefault(), numThreads, tcpNoDelay); } - public Transport(String name) { this(name, null, CryptoEngine.createDefault(), 1, true); } + public Transport(String name, CryptoEngine cryptoEngine, int numThreads, int eventsBeforeWakeup) { + this(name, null, cryptoEngine, numThreads, true, eventsBeforeWakeup); + } + public Transport(String name, CryptoEngine cryptoEngine, int numThreads) { + this(name, null, cryptoEngine, numThreads, true, 1); + } + public Transport(String name, int numThreads, int eventsBeforeWakeup) { + this(name, null, CryptoEngine.createDefault(), numThreads, true, eventsBeforeWakeup); + } + public Transport(String name, int numThreads, boolean tcpNoDelay, int eventsBeforeWakeup) { + this(name, null, CryptoEngine.createDefault(), numThreads, tcpNoDelay, eventsBeforeWakeup); } + public Transport(String name, int numThreads) { + this(name, null, CryptoEngine.createDefault(), numThreads, true, 1); + } + public Transport(String name) { + this(name, null, CryptoEngine.createDefault(), 1, true, 1); + } // Only for testing public Transport() { this("default"); } @@ -72,6 +88,7 @@ public class Transport { } boolean getTcpNoDelay() { return tcpNoDelay; } + int getEventsBeforeWakeup() { return eventsBeforeWakeup; } String getName() { return name; } diff --git a/jrt/src/com/yahoo/jrt/TransportThread.java b/jrt/src/com/yahoo/jrt/TransportThread.java index 8f158161888..107e0490405 100644 --- a/jrt/src/com/yahoo/jrt/TransportThread.java +++ b/jrt/src/com/yahoo/jrt/TransportThread.java @@ -5,7 +5,6 @@ package com.yahoo.jrt; import java.io.IOException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; -import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.logging.Level; import java.util.logging.Logger; @@ -32,30 +31,30 @@ public class TransportThread { } private class AddConnectionCmd implements Runnable { - private Connection conn; + private final Connection conn; AddConnectionCmd(Connection conn) { this.conn = conn; } public void run() { handleAddConnection(conn); } } private class CloseConnectionCmd implements Runnable { - private Connection conn; + private final Connection conn; CloseConnectionCmd(Connection conn) { this.conn = conn; } public void run() { handleCloseConnection(conn); } } private class EnableWriteCmd implements Runnable { - private Connection conn; + private final Connection conn; EnableWriteCmd(Connection conn) { this.conn = conn; } public void run() { handleEnableWrite(conn); } } private class HandshakeWorkDoneCmd implements Runnable { - private Connection conn; + private final Connection conn; HandshakeWorkDoneCmd(Connection conn) { this.conn = conn; } public void run() { handleHandshakeWorkDone(conn); } } - private class SyncCmd implements Runnable { + private static class SyncCmd implements Runnable { boolean done = false; public synchronized void waitDone() { while (!done) { @@ -68,7 +67,7 @@ public class TransportThread { } } - private static Logger log = Logger.getLogger(TransportThread.class.getName()); + private static final Logger log = Logger.getLogger(TransportThread.class.getName()); private final Transport parent; private final Thread thread; @@ -120,15 +119,15 @@ public class TransportThread { } private boolean postCommand(Runnable cmd) { - boolean wakeup; + int qlen; synchronized (this) { if (state == CLOSED) { return false; } - wakeup = queue.isEmpty(); queue.enqueue(cmd); + qlen = queue.size(); } - if (wakeup) { + if (qlen == parent.getEventsBeforeWakeup()) { selector.wakeup(); } return true; diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java index b0724ad6029..1c41f87d1ee 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java @@ -87,7 +87,8 @@ public class RPCNetwork implements Network, MethodHandler { public RPCNetwork(RPCNetworkParams params, SlobrokConfigSubscriber slobrokConfig) { this.slobroksConfig = slobrokConfig; identity = params.getIdentity(); - orb = new Supervisor(new Transport("mbus-rpc-" + identity.getServicePrefix(), params.getNumNetworkThreads(), shouldEnableTcpNodelay(params.getOptimization()))); + orb = new Supervisor(new Transport("mbus-rpc-" + identity.getServicePrefix(), params.getNumNetworkThreads(), + shouldEnableTcpNodelay(params.getOptimization()), params.getTransportEventsBeforeWakeup())); orb.setMaxInputBufferSize(params.getMaxInputBufferSize()); orb.setMaxOutputBufferSize(params.getMaxOutputBufferSize()); targetPool = new RPCTargetPool(params.getConnectionExpireSecs(), params.getNumTargetsPerSpec()); diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetworkParams.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetworkParams.java index e77cddd8b06..db22363785d 100755 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetworkParams.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetworkParams.java @@ -21,6 +21,8 @@ public class RPCNetworkParams { private double connectionExpireSecs = 30; private int numTargetsPerSpec = 1; private int numNetworkThreads = 2; + + private int transportEventsBeforeWakeup = 1; public enum Optimization {LATENCY, THROUGHPUT} Optimization optimization = Optimization.LATENCY; @@ -216,4 +218,13 @@ public class RPCNetworkParams { this.maxOutputBufferSize = maxOutputBufferSize; return this; } + + public int getTransportEventsBeforeWakeup() { + return transportEventsBeforeWakeup; + } + + public RPCNetworkParams setTransportEventsBeforeWakeup(int transportEventsBeforeWakeup) { + this.transportEventsBeforeWakeup = transportEventsBeforeWakeup; + return this; + } } |