diff options
7 files changed, 60 insertions, 16 deletions
diff --git a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java index 113d99f77f9..c32d6fd5edc 100644 --- a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java +++ b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java @@ -120,7 +120,10 @@ public final class SessionCache extends AbstractComponent { RPCNetworkParams netParams = new RPCNetworkParams() .setSlobrokConfigId(slobrokConfigId) .setIdentity(new Identity(identity)) - .setListenPort(mbusConfig.port()); + .setListenPort(mbusConfig.port()) + .setNumTargetsPerSpec(mbusConfig.numconnectionspertarget()) + .setNumNetworkThreads(mbusConfig.numthreads()) + .setOptimization(RPCNetworkParams.Optimization.valueOf(mbusConfig.optimization().name())); return SharedMessageBus.newInstance(mbusParams, netParams); } diff --git a/container-messagebus/src/main/resources/configdefinitions/container-mbus.def b/container-messagebus/src/main/resources/configdefinitions/container-mbus.def index b18bec66959..92d3cfbe512 100644 --- a/container-messagebus/src/main/resources/configdefinitions/container-mbus.def +++ b/container-messagebus/src/main/resources/configdefinitions/container-mbus.def @@ -2,9 +2,24 @@ namespace=container.jdisc #settings for message bus in container -enabled bool default=false + +# Which network port is used port int default=0 + +# Number of connections per target +numconnectionspertarget int default=1 + +# Number network threads +numthreads int default=2 + +# Optimize for latency, or throughput. +optimization enum {LATENCY, THROUGHPUT} default=LATENCY + +# Everying below is deprecated and will go away very soon. +# Dynamic throttling is used, and works better than anything else. maxpendingcount int default=2048 + +enabled bool default=false #maxpendingsize is set in megabytes! maxpendingsize int default=100 diff --git a/jrt/src/com/yahoo/jrt/Acceptor.java b/jrt/src/com/yahoo/jrt/Acceptor.java index 9e9dafcbcb5..aed22ac090c 100644 --- a/jrt/src/com/yahoo/jrt/Acceptor.java +++ b/jrt/src/com/yahoo/jrt/Acceptor.java @@ -101,7 +101,7 @@ public class Acceptor { while (serverChannel.isOpen()) { try { TransportThread tt = parent.selectThread(); - tt.addConnection(new Connection(tt, owner, serverChannel.accept())); + tt.addConnection(new Connection(tt, owner, serverChannel.accept(), parent.getTcpNoDelay())); tt.sync(); } catch (ClosedChannelException ignore) { } catch (Exception e) { diff --git a/jrt/src/com/yahoo/jrt/Connection.java b/jrt/src/com/yahoo/jrt/Connection.java index c9c6d78ffba..5a4478cf91e 100644 --- a/jrt/src/com/yahoo/jrt/Connection.java +++ b/jrt/src/com/yahoo/jrt/Connection.java @@ -36,6 +36,7 @@ class Connection extends Target { private final Buffer output = new Buffer(WRITE_SIZE * 2); private int maxInputSize = 64*1024; private int maxOutputSize = 64*1024; + private final boolean tcpNoDelay; private final Map<Integer, ReplyHandler> replyMap = new HashMap<>(); private final Map<TargetWatcher, TargetWatcher> watchers = new IdentityHashMap<>(); private int activeReqs = 0; @@ -89,21 +90,23 @@ class Connection extends Target { } public Connection(TransportThread parent, Supervisor owner, - SocketChannel channel) { + SocketChannel channel, boolean tcpNoDelay) { this.parent = parent; this.owner = owner; this.socket = parent.transport().createServerCryptoSocket(channel); this.spec = null; + this.tcpNoDelay = tcpNoDelay; server = true; owner.sessionInit(this); } - public Connection(TransportThread parent, Supervisor owner, Spec spec, Object context) { + public Connection(TransportThread parent, Supervisor owner, Spec spec, Object context, boolean tcpNoDelay) { super(context); this.parent = parent; this.owner = owner; this.spec = spec; + this.tcpNoDelay = tcpNoDelay; server = false; owner.sessionInit(this); } @@ -184,7 +187,7 @@ class Connection extends Target { } try { socket.channel().configureBlocking(false); - socket.channel().socket().setTcpNoDelay(true); + socket.channel().socket().setTcpNoDelay(tcpNoDelay); selectionKey = socket.channel().register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, this); diff --git a/jrt/src/com/yahoo/jrt/Transport.java b/jrt/src/com/yahoo/jrt/Transport.java index 8abd3942a39..02a6e3e05f7 100644 --- a/jrt/src/com/yahoo/jrt/Transport.java +++ b/jrt/src/com/yahoo/jrt/Transport.java @@ -25,6 +25,7 @@ public class Transport { private final Connector connector; private final Worker worker; private final AtomicInteger runCnt; + private final boolean tcpNoDelay; private final TransportMetrics metrics = TransportMetrics.getInstance(); private final ArrayList<TransportThread> threads = new ArrayList<>(); @@ -40,11 +41,10 @@ public class Transport { * @param cryptoEngine crypto engine to use * @param numThreads number of {@link TransportThread}s. **/ - public Transport(FatalErrorHandler fatalHandler, CryptoEngine cryptoEngine, int numThreads) { - synchronized (this) { - this.fatalHandler = fatalHandler; // NB: this must be set first - } + public Transport(FatalErrorHandler fatalHandler, CryptoEngine cryptoEngine, int numThreads, boolean tcpNoDelay) { + this.fatalHandler = fatalHandler; // NB: this must be set first this.cryptoEngine = cryptoEngine; + this.tcpNoDelay = tcpNoDelay; connector = new Connector(); worker = new Worker(this); runCnt = new AtomicInteger(numThreads); @@ -52,10 +52,10 @@ public class Transport { threads.add(new TransportThread(this)); } } - public Transport(CryptoEngine cryptoEngine, int numThreads) { this(null, cryptoEngine, numThreads); } - public Transport(FatalErrorHandler fatalHandler, int numThreads) { this(fatalHandler, CryptoEngine.createDefault(), numThreads); } - public Transport(int numThreads) { this(null, CryptoEngine.createDefault(), numThreads); } - public Transport() { this(null, CryptoEngine.createDefault(), 1); } + public Transport(CryptoEngine cryptoEngine, int numThreads) { this(null, cryptoEngine, numThreads, true); } + public Transport(int numThreads) { this(null, CryptoEngine.createDefault(), numThreads, true); } + public Transport(int numThreads, boolean tcpNoDelay) { this(null, CryptoEngine.createDefault(), numThreads, tcpNoDelay); } + public Transport() { this(null, CryptoEngine.createDefault(), 1, true); } /** * Select a random transport thread @@ -66,6 +66,8 @@ public class Transport { return threads.get(rnd.nextInt(threads.size())); } + boolean getTcpNoDelay() { return tcpNoDelay; } + /** * Use the underlying CryptoEngine to create a CryptoSocket for * the client side of a connection. @@ -130,7 +132,7 @@ public class Transport { * @param context application context for the new connection */ Connection connect(Supervisor owner, Spec spec, Object context) { - Connection conn = new Connection(selectThread(), owner, spec, context); + Connection conn = new Connection(selectThread(), owner, spec, context, getTcpNoDelay()); connector.connectLater(conn); return conn; } diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java index 0fd52e9bdbc..602c33bd6f4 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java @@ -82,7 +82,7 @@ public class RPCNetwork implements Network, MethodHandler { public RPCNetwork(RPCNetworkParams params, SlobrokConfigSubscriber slobrokConfig) { this.slobroksConfig = slobrokConfig; identity = params.getIdentity(); - orb = new Supervisor(new Transport(2)); + orb = new Supervisor(new Transport(params.getNumNetworkThreads(), params.getOptimization() == RPCNetworkParams.Optimization.LATENCY)); orb.setMaxInputBufferSize(params.getMaxInputBufferSize()); orb.setMaxOutputBufferSize(params.getMaxOutputBufferSize()); targetPool = new RPCTargetPool(params.getConnectionExpireSecs(), params.getNumTargetsPerSpec()); diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetworkParams.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetworkParams.java index d6d7603f54a..e77cddd8b06 100755 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetworkParams.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetworkParams.java @@ -20,6 +20,9 @@ public class RPCNetworkParams { private int maxOutputBufferSize = 256 * 1024; private double connectionExpireSecs = 30; private int numTargetsPerSpec = 1; + private int numNetworkThreads = 2; + public enum Optimization {LATENCY, THROUGHPUT} + Optimization optimization = Optimization.LATENCY; /** * Constructs a new instance of this class with reasonable default values. @@ -42,6 +45,8 @@ public class RPCNetworkParams { maxInputBufferSize = params.maxInputBufferSize; maxOutputBufferSize = params.maxOutputBufferSize; numTargetsPerSpec = params.numTargetsPerSpec; + numNetworkThreads = params.numNetworkThreads; + optimization = params.optimization; } /** @@ -152,6 +157,22 @@ public class RPCNetworkParams { return numTargetsPerSpec; } + public RPCNetworkParams setNumNetworkThreads(int numNetworkThreads) { + this.numNetworkThreads = numNetworkThreads; + return this; + } + int getNumNetworkThreads() { + return numNetworkThreads; + } + + public RPCNetworkParams setOptimization(Optimization optimization) { + this.optimization = optimization; + return this; + } + Optimization getOptimization() { + return optimization; + } + /** * Returns the maximum input buffer size allowed for the underlying FNET connection. * |