diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-03-25 14:31:21 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-03-25 14:31:21 +0000 |
commit | 5ba4494cacab2dda7e037aecd18bf9055033094d (patch) | |
tree | 57cfd6333903577d8aeed7fd1b072a49e7fa1bed /jrt | |
parent | e1424276029d0c364a33724eb3b03b7807e17c9e (diff) |
Add control over
- num network threads
- num connections per target
- tcpnodelay
Defaults are unchanged.
Diffstat (limited to 'jrt')
-rw-r--r-- | jrt/src/com/yahoo/jrt/Acceptor.java | 2 | ||||
-rw-r--r-- | jrt/src/com/yahoo/jrt/Connection.java | 9 | ||||
-rw-r--r-- | jrt/src/com/yahoo/jrt/Transport.java | 20 |
3 files changed, 18 insertions, 13 deletions
diff --git a/jrt/src/com/yahoo/jrt/Acceptor.java b/jrt/src/com/yahoo/jrt/Acceptor.java index 9e9dafcbcb5..aed22ac090c 100644 --- a/jrt/src/com/yahoo/jrt/Acceptor.java +++ b/jrt/src/com/yahoo/jrt/Acceptor.java @@ -101,7 +101,7 @@ public class Acceptor { while (serverChannel.isOpen()) { try { TransportThread tt = parent.selectThread(); - tt.addConnection(new Connection(tt, owner, serverChannel.accept())); + tt.addConnection(new Connection(tt, owner, serverChannel.accept(), parent.getTcpNoDelay())); tt.sync(); } catch (ClosedChannelException ignore) { } catch (Exception e) { diff --git a/jrt/src/com/yahoo/jrt/Connection.java b/jrt/src/com/yahoo/jrt/Connection.java index c9c6d78ffba..5a4478cf91e 100644 --- a/jrt/src/com/yahoo/jrt/Connection.java +++ b/jrt/src/com/yahoo/jrt/Connection.java @@ -36,6 +36,7 @@ class Connection extends Target { private final Buffer output = new Buffer(WRITE_SIZE * 2); private int maxInputSize = 64*1024; private int maxOutputSize = 64*1024; + private final boolean tcpNoDelay; private final Map<Integer, ReplyHandler> replyMap = new HashMap<>(); private final Map<TargetWatcher, TargetWatcher> watchers = new IdentityHashMap<>(); private int activeReqs = 0; @@ -89,21 +90,23 @@ class Connection extends Target { } public Connection(TransportThread parent, Supervisor owner, - SocketChannel channel) { + SocketChannel channel, boolean tcpNoDelay) { this.parent = parent; this.owner = owner; this.socket = parent.transport().createServerCryptoSocket(channel); this.spec = null; + this.tcpNoDelay = tcpNoDelay; server = true; owner.sessionInit(this); } - public Connection(TransportThread parent, Supervisor owner, Spec spec, Object context) { + public Connection(TransportThread parent, Supervisor owner, Spec spec, Object context, boolean tcpNoDelay) { super(context); this.parent = parent; this.owner = owner; this.spec = spec; + this.tcpNoDelay = tcpNoDelay; server = false; owner.sessionInit(this); } @@ -184,7 +187,7 @@ class Connection extends Target { } try { socket.channel().configureBlocking(false); - socket.channel().socket().setTcpNoDelay(true); + socket.channel().socket().setTcpNoDelay(tcpNoDelay); selectionKey = socket.channel().register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, this); diff --git a/jrt/src/com/yahoo/jrt/Transport.java b/jrt/src/com/yahoo/jrt/Transport.java index 8abd3942a39..02a6e3e05f7 100644 --- a/jrt/src/com/yahoo/jrt/Transport.java +++ b/jrt/src/com/yahoo/jrt/Transport.java @@ -25,6 +25,7 @@ public class Transport { private final Connector connector; private final Worker worker; private final AtomicInteger runCnt; + private final boolean tcpNoDelay; private final TransportMetrics metrics = TransportMetrics.getInstance(); private final ArrayList<TransportThread> threads = new ArrayList<>(); @@ -40,11 +41,10 @@ public class Transport { * @param cryptoEngine crypto engine to use * @param numThreads number of {@link TransportThread}s. **/ - public Transport(FatalErrorHandler fatalHandler, CryptoEngine cryptoEngine, int numThreads) { - synchronized (this) { - this.fatalHandler = fatalHandler; // NB: this must be set first - } + public Transport(FatalErrorHandler fatalHandler, CryptoEngine cryptoEngine, int numThreads, boolean tcpNoDelay) { + this.fatalHandler = fatalHandler; // NB: this must be set first this.cryptoEngine = cryptoEngine; + this.tcpNoDelay = tcpNoDelay; connector = new Connector(); worker = new Worker(this); runCnt = new AtomicInteger(numThreads); @@ -52,10 +52,10 @@ public class Transport { threads.add(new TransportThread(this)); } } - public Transport(CryptoEngine cryptoEngine, int numThreads) { this(null, cryptoEngine, numThreads); } - public Transport(FatalErrorHandler fatalHandler, int numThreads) { this(fatalHandler, CryptoEngine.createDefault(), numThreads); } - public Transport(int numThreads) { this(null, CryptoEngine.createDefault(), numThreads); } - public Transport() { this(null, CryptoEngine.createDefault(), 1); } + public Transport(CryptoEngine cryptoEngine, int numThreads) { this(null, cryptoEngine, numThreads, true); } + public Transport(int numThreads) { this(null, CryptoEngine.createDefault(), numThreads, true); } + public Transport(int numThreads, boolean tcpNoDelay) { this(null, CryptoEngine.createDefault(), numThreads, tcpNoDelay); } + public Transport() { this(null, CryptoEngine.createDefault(), 1, true); } /** * Select a random transport thread @@ -66,6 +66,8 @@ public class Transport { return threads.get(rnd.nextInt(threads.size())); } + boolean getTcpNoDelay() { return tcpNoDelay; } + /** * Use the underlying CryptoEngine to create a CryptoSocket for * the client side of a connection. @@ -130,7 +132,7 @@ public class Transport { * @param context application context for the new connection */ Connection connect(Supervisor owner, Spec spec, Object context) { - Connection conn = new Connection(selectThread(), owner, spec, context); + Connection conn = new Connection(selectThread(), owner, spec, context, getTcpNoDelay()); connector.connectLater(conn); return conn; } |