summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java5
-rw-r--r--container-messagebus/src/main/resources/configdefinitions/container-mbus.def17
-rw-r--r--jrt/src/com/yahoo/jrt/Acceptor.java2
-rw-r--r--jrt/src/com/yahoo/jrt/Connection.java9
-rw-r--r--jrt/src/com/yahoo/jrt/Transport.java20
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java2
-rwxr-xr-xmessagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetworkParams.java21
7 files changed, 60 insertions, 16 deletions
diff --git a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
index 113d99f77f9..c32d6fd5edc 100644
--- a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
+++ b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
@@ -120,7 +120,10 @@ public final class SessionCache extends AbstractComponent {
RPCNetworkParams netParams = new RPCNetworkParams()
.setSlobrokConfigId(slobrokConfigId)
.setIdentity(new Identity(identity))
- .setListenPort(mbusConfig.port());
+ .setListenPort(mbusConfig.port())
+ .setNumTargetsPerSpec(mbusConfig.numconnectionspertarget())
+ .setNumNetworkThreads(mbusConfig.numthreads())
+ .setOptimization(RPCNetworkParams.Optimization.valueOf(mbusConfig.optimization().name()));
return SharedMessageBus.newInstance(mbusParams, netParams);
}
diff --git a/container-messagebus/src/main/resources/configdefinitions/container-mbus.def b/container-messagebus/src/main/resources/configdefinitions/container-mbus.def
index b18bec66959..92d3cfbe512 100644
--- a/container-messagebus/src/main/resources/configdefinitions/container-mbus.def
+++ b/container-messagebus/src/main/resources/configdefinitions/container-mbus.def
@@ -2,9 +2,24 @@
namespace=container.jdisc
#settings for message bus in container
-enabled bool default=false
+
+# Which network port is used
port int default=0
+
+# Number of connections per target
+numconnectionspertarget int default=1
+
+# Number network threads
+numthreads int default=2
+
+# Optimize for latency, or throughput.
+optimization enum {LATENCY, THROUGHPUT} default=LATENCY
+
+# Everying below is deprecated and will go away very soon.
+# Dynamic throttling is used, and works better than anything else.
maxpendingcount int default=2048
+
+enabled bool default=false
#maxpendingsize is set in megabytes!
maxpendingsize int default=100
diff --git a/jrt/src/com/yahoo/jrt/Acceptor.java b/jrt/src/com/yahoo/jrt/Acceptor.java
index 9e9dafcbcb5..aed22ac090c 100644
--- a/jrt/src/com/yahoo/jrt/Acceptor.java
+++ b/jrt/src/com/yahoo/jrt/Acceptor.java
@@ -101,7 +101,7 @@ public class Acceptor {
while (serverChannel.isOpen()) {
try {
TransportThread tt = parent.selectThread();
- tt.addConnection(new Connection(tt, owner, serverChannel.accept()));
+ tt.addConnection(new Connection(tt, owner, serverChannel.accept(), parent.getTcpNoDelay()));
tt.sync();
} catch (ClosedChannelException ignore) {
} catch (Exception e) {
diff --git a/jrt/src/com/yahoo/jrt/Connection.java b/jrt/src/com/yahoo/jrt/Connection.java
index c9c6d78ffba..5a4478cf91e 100644
--- a/jrt/src/com/yahoo/jrt/Connection.java
+++ b/jrt/src/com/yahoo/jrt/Connection.java
@@ -36,6 +36,7 @@ class Connection extends Target {
private final Buffer output = new Buffer(WRITE_SIZE * 2);
private int maxInputSize = 64*1024;
private int maxOutputSize = 64*1024;
+ private final boolean tcpNoDelay;
private final Map<Integer, ReplyHandler> replyMap = new HashMap<>();
private final Map<TargetWatcher, TargetWatcher> watchers = new IdentityHashMap<>();
private int activeReqs = 0;
@@ -89,21 +90,23 @@ class Connection extends Target {
}
public Connection(TransportThread parent, Supervisor owner,
- SocketChannel channel) {
+ SocketChannel channel, boolean tcpNoDelay) {
this.parent = parent;
this.owner = owner;
this.socket = parent.transport().createServerCryptoSocket(channel);
this.spec = null;
+ this.tcpNoDelay = tcpNoDelay;
server = true;
owner.sessionInit(this);
}
- public Connection(TransportThread parent, Supervisor owner, Spec spec, Object context) {
+ public Connection(TransportThread parent, Supervisor owner, Spec spec, Object context, boolean tcpNoDelay) {
super(context);
this.parent = parent;
this.owner = owner;
this.spec = spec;
+ this.tcpNoDelay = tcpNoDelay;
server = false;
owner.sessionInit(this);
}
@@ -184,7 +187,7 @@ class Connection extends Target {
}
try {
socket.channel().configureBlocking(false);
- socket.channel().socket().setTcpNoDelay(true);
+ socket.channel().socket().setTcpNoDelay(tcpNoDelay);
selectionKey = socket.channel().register(selector,
SelectionKey.OP_READ | SelectionKey.OP_WRITE,
this);
diff --git a/jrt/src/com/yahoo/jrt/Transport.java b/jrt/src/com/yahoo/jrt/Transport.java
index 8abd3942a39..02a6e3e05f7 100644
--- a/jrt/src/com/yahoo/jrt/Transport.java
+++ b/jrt/src/com/yahoo/jrt/Transport.java
@@ -25,6 +25,7 @@ public class Transport {
private final Connector connector;
private final Worker worker;
private final AtomicInteger runCnt;
+ private final boolean tcpNoDelay;
private final TransportMetrics metrics = TransportMetrics.getInstance();
private final ArrayList<TransportThread> threads = new ArrayList<>();
@@ -40,11 +41,10 @@ public class Transport {
* @param cryptoEngine crypto engine to use
* @param numThreads number of {@link TransportThread}s.
**/
- public Transport(FatalErrorHandler fatalHandler, CryptoEngine cryptoEngine, int numThreads) {
- synchronized (this) {
- this.fatalHandler = fatalHandler; // NB: this must be set first
- }
+ public Transport(FatalErrorHandler fatalHandler, CryptoEngine cryptoEngine, int numThreads, boolean tcpNoDelay) {
+ this.fatalHandler = fatalHandler; // NB: this must be set first
this.cryptoEngine = cryptoEngine;
+ this.tcpNoDelay = tcpNoDelay;
connector = new Connector();
worker = new Worker(this);
runCnt = new AtomicInteger(numThreads);
@@ -52,10 +52,10 @@ public class Transport {
threads.add(new TransportThread(this));
}
}
- public Transport(CryptoEngine cryptoEngine, int numThreads) { this(null, cryptoEngine, numThreads); }
- public Transport(FatalErrorHandler fatalHandler, int numThreads) { this(fatalHandler, CryptoEngine.createDefault(), numThreads); }
- public Transport(int numThreads) { this(null, CryptoEngine.createDefault(), numThreads); }
- public Transport() { this(null, CryptoEngine.createDefault(), 1); }
+ public Transport(CryptoEngine cryptoEngine, int numThreads) { this(null, cryptoEngine, numThreads, true); }
+ public Transport(int numThreads) { this(null, CryptoEngine.createDefault(), numThreads, true); }
+ public Transport(int numThreads, boolean tcpNoDelay) { this(null, CryptoEngine.createDefault(), numThreads, tcpNoDelay); }
+ public Transport() { this(null, CryptoEngine.createDefault(), 1, true); }
/**
* Select a random transport thread
@@ -66,6 +66,8 @@ public class Transport {
return threads.get(rnd.nextInt(threads.size()));
}
+ boolean getTcpNoDelay() { return tcpNoDelay; }
+
/**
* Use the underlying CryptoEngine to create a CryptoSocket for
* the client side of a connection.
@@ -130,7 +132,7 @@ public class Transport {
* @param context application context for the new connection
*/
Connection connect(Supervisor owner, Spec spec, Object context) {
- Connection conn = new Connection(selectThread(), owner, spec, context);
+ Connection conn = new Connection(selectThread(), owner, spec, context, getTcpNoDelay());
connector.connectLater(conn);
return conn;
}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java
index 0fd52e9bdbc..602c33bd6f4 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java
@@ -82,7 +82,7 @@ public class RPCNetwork implements Network, MethodHandler {
public RPCNetwork(RPCNetworkParams params, SlobrokConfigSubscriber slobrokConfig) {
this.slobroksConfig = slobrokConfig;
identity = params.getIdentity();
- orb = new Supervisor(new Transport(2));
+ orb = new Supervisor(new Transport(params.getNumNetworkThreads(), params.getOptimization() == RPCNetworkParams.Optimization.LATENCY));
orb.setMaxInputBufferSize(params.getMaxInputBufferSize());
orb.setMaxOutputBufferSize(params.getMaxOutputBufferSize());
targetPool = new RPCTargetPool(params.getConnectionExpireSecs(), params.getNumTargetsPerSpec());
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetworkParams.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetworkParams.java
index d6d7603f54a..e77cddd8b06 100755
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetworkParams.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetworkParams.java
@@ -20,6 +20,9 @@ public class RPCNetworkParams {
private int maxOutputBufferSize = 256 * 1024;
private double connectionExpireSecs = 30;
private int numTargetsPerSpec = 1;
+ private int numNetworkThreads = 2;
+ public enum Optimization {LATENCY, THROUGHPUT}
+ Optimization optimization = Optimization.LATENCY;
/**
* Constructs a new instance of this class with reasonable default values.
@@ -42,6 +45,8 @@ public class RPCNetworkParams {
maxInputBufferSize = params.maxInputBufferSize;
maxOutputBufferSize = params.maxOutputBufferSize;
numTargetsPerSpec = params.numTargetsPerSpec;
+ numNetworkThreads = params.numNetworkThreads;
+ optimization = params.optimization;
}
/**
@@ -152,6 +157,22 @@ public class RPCNetworkParams {
return numTargetsPerSpec;
}
+ public RPCNetworkParams setNumNetworkThreads(int numNetworkThreads) {
+ this.numNetworkThreads = numNetworkThreads;
+ return this;
+ }
+ int getNumNetworkThreads() {
+ return numNetworkThreads;
+ }
+
+ public RPCNetworkParams setOptimization(Optimization optimization) {
+ this.optimization = optimization;
+ return this;
+ }
+ Optimization getOptimization() {
+ return optimization;
+ }
+
/**
* Returns the maximum input buffer size allowed for the underlying FNET connection.
*