aboutsummaryrefslogtreecommitdiffstats
path: root/jrt
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-03-25 14:31:21 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-03-25 14:31:21 +0000
commit5ba4494cacab2dda7e037aecd18bf9055033094d (patch)
tree57cfd6333903577d8aeed7fd1b072a49e7fa1bed /jrt
parente1424276029d0c364a33724eb3b03b7807e17c9e (diff)
Add control over
- num network threads - num connections per target - tcpnodelay Defaults are unchanged.
Diffstat (limited to 'jrt')
-rw-r--r--jrt/src/com/yahoo/jrt/Acceptor.java2
-rw-r--r--jrt/src/com/yahoo/jrt/Connection.java9
-rw-r--r--jrt/src/com/yahoo/jrt/Transport.java20
3 files changed, 18 insertions, 13 deletions
diff --git a/jrt/src/com/yahoo/jrt/Acceptor.java b/jrt/src/com/yahoo/jrt/Acceptor.java
index 9e9dafcbcb5..aed22ac090c 100644
--- a/jrt/src/com/yahoo/jrt/Acceptor.java
+++ b/jrt/src/com/yahoo/jrt/Acceptor.java
@@ -101,7 +101,7 @@ public class Acceptor {
while (serverChannel.isOpen()) {
try {
TransportThread tt = parent.selectThread();
- tt.addConnection(new Connection(tt, owner, serverChannel.accept()));
+ tt.addConnection(new Connection(tt, owner, serverChannel.accept(), parent.getTcpNoDelay()));
tt.sync();
} catch (ClosedChannelException ignore) {
} catch (Exception e) {
diff --git a/jrt/src/com/yahoo/jrt/Connection.java b/jrt/src/com/yahoo/jrt/Connection.java
index c9c6d78ffba..5a4478cf91e 100644
--- a/jrt/src/com/yahoo/jrt/Connection.java
+++ b/jrt/src/com/yahoo/jrt/Connection.java
@@ -36,6 +36,7 @@ class Connection extends Target {
private final Buffer output = new Buffer(WRITE_SIZE * 2);
private int maxInputSize = 64*1024;
private int maxOutputSize = 64*1024;
+ private final boolean tcpNoDelay;
private final Map<Integer, ReplyHandler> replyMap = new HashMap<>();
private final Map<TargetWatcher, TargetWatcher> watchers = new IdentityHashMap<>();
private int activeReqs = 0;
@@ -89,21 +90,23 @@ class Connection extends Target {
}
public Connection(TransportThread parent, Supervisor owner,
- SocketChannel channel) {
+ SocketChannel channel, boolean tcpNoDelay) {
this.parent = parent;
this.owner = owner;
this.socket = parent.transport().createServerCryptoSocket(channel);
this.spec = null;
+ this.tcpNoDelay = tcpNoDelay;
server = true;
owner.sessionInit(this);
}
- public Connection(TransportThread parent, Supervisor owner, Spec spec, Object context) {
+ public Connection(TransportThread parent, Supervisor owner, Spec spec, Object context, boolean tcpNoDelay) {
super(context);
this.parent = parent;
this.owner = owner;
this.spec = spec;
+ this.tcpNoDelay = tcpNoDelay;
server = false;
owner.sessionInit(this);
}
@@ -184,7 +187,7 @@ class Connection extends Target {
}
try {
socket.channel().configureBlocking(false);
- socket.channel().socket().setTcpNoDelay(true);
+ socket.channel().socket().setTcpNoDelay(tcpNoDelay);
selectionKey = socket.channel().register(selector,
SelectionKey.OP_READ | SelectionKey.OP_WRITE,
this);
diff --git a/jrt/src/com/yahoo/jrt/Transport.java b/jrt/src/com/yahoo/jrt/Transport.java
index 8abd3942a39..02a6e3e05f7 100644
--- a/jrt/src/com/yahoo/jrt/Transport.java
+++ b/jrt/src/com/yahoo/jrt/Transport.java
@@ -25,6 +25,7 @@ public class Transport {
private final Connector connector;
private final Worker worker;
private final AtomicInteger runCnt;
+ private final boolean tcpNoDelay;
private final TransportMetrics metrics = TransportMetrics.getInstance();
private final ArrayList<TransportThread> threads = new ArrayList<>();
@@ -40,11 +41,10 @@ public class Transport {
* @param cryptoEngine crypto engine to use
* @param numThreads number of {@link TransportThread}s.
**/
- public Transport(FatalErrorHandler fatalHandler, CryptoEngine cryptoEngine, int numThreads) {
- synchronized (this) {
- this.fatalHandler = fatalHandler; // NB: this must be set first
- }
+ public Transport(FatalErrorHandler fatalHandler, CryptoEngine cryptoEngine, int numThreads, boolean tcpNoDelay) {
+ this.fatalHandler = fatalHandler; // NB: this must be set first
this.cryptoEngine = cryptoEngine;
+ this.tcpNoDelay = tcpNoDelay;
connector = new Connector();
worker = new Worker(this);
runCnt = new AtomicInteger(numThreads);
@@ -52,10 +52,10 @@ public class Transport {
threads.add(new TransportThread(this));
}
}
- public Transport(CryptoEngine cryptoEngine, int numThreads) { this(null, cryptoEngine, numThreads); }
- public Transport(FatalErrorHandler fatalHandler, int numThreads) { this(fatalHandler, CryptoEngine.createDefault(), numThreads); }
- public Transport(int numThreads) { this(null, CryptoEngine.createDefault(), numThreads); }
- public Transport() { this(null, CryptoEngine.createDefault(), 1); }
+ public Transport(CryptoEngine cryptoEngine, int numThreads) { this(null, cryptoEngine, numThreads, true); }
+ public Transport(int numThreads) { this(null, CryptoEngine.createDefault(), numThreads, true); }
+ public Transport(int numThreads, boolean tcpNoDelay) { this(null, CryptoEngine.createDefault(), numThreads, tcpNoDelay); }
+ public Transport() { this(null, CryptoEngine.createDefault(), 1, true); }
/**
* Select a random transport thread
@@ -66,6 +66,8 @@ public class Transport {
return threads.get(rnd.nextInt(threads.size()));
}
+ boolean getTcpNoDelay() { return tcpNoDelay; }
+
/**
* Use the underlying CryptoEngine to create a CryptoSocket for
* the client side of a connection.
@@ -130,7 +132,7 @@ public class Transport {
* @param context application context for the new connection
*/
Connection connect(Supervisor owner, Spec spec, Object context) {
- Connection conn = new Connection(selectThread(), owner, spec, context);
+ Connection conn = new Connection(selectThread(), owner, spec, context, getTcpNoDelay());
connector.connectLater(conn);
return conn;
}