aboutsummaryrefslogtreecommitdiffstats
path: root/jrt
diff options
context:
space:
mode:
authorHåvard Pettersen <havardpe@oath.com>2021-06-02 12:26:46 +0000
committerHåvard Pettersen <havardpe@oath.com>2021-06-02 12:26:46 +0000
commit15f10b8cc0e16acd28dc28d387625a4769509b67 (patch)
treef67b7a2c70ff3a788d41db2fc737d3e4db088d95 /jrt
parent48b61eb09f817c0f4153ad8d0989695f6758cff8 (diff)
drop empty buffers support for Java
Diffstat (limited to 'jrt')
-rw-r--r--jrt/src/com/yahoo/jrt/Connection.java13
-rw-r--r--jrt/src/com/yahoo/jrt/CryptoSocket.java7
-rw-r--r--jrt/src/com/yahoo/jrt/MaybeTlsCryptoSocket.java1
-rw-r--r--jrt/src/com/yahoo/jrt/NullCryptoSocket.java1
-rw-r--r--jrt/src/com/yahoo/jrt/Supervisor.java14
-rw-r--r--jrt/src/com/yahoo/jrt/TlsCryptoSocket.java5
-rw-r--r--jrt/src/com/yahoo/jrt/XorCryptoSocket.java5
-rw-r--r--jrt/tests/com/yahoo/jrt/LatencyTest.java12
8 files changed, 56 insertions, 2 deletions
diff --git a/jrt/src/com/yahoo/jrt/Connection.java b/jrt/src/com/yahoo/jrt/Connection.java
index 7393d30fc81..891558684ed 100644
--- a/jrt/src/com/yahoo/jrt/Connection.java
+++ b/jrt/src/com/yahoo/jrt/Connection.java
@@ -36,6 +36,7 @@ class Connection extends Target {
private final Buffer output = new Buffer(0x1000); // Start off with small buffer.
private int maxInputSize = 64*1024;
private int maxOutputSize = 64*1024;
+ private boolean dropEmptyBuffers = false;
private final boolean tcpNoDelay;
private final Map<Integer, ReplyHandler> replyMap = new HashMap<>();
private final Map<TargetWatcher, TargetWatcher> watchers = new IdentityHashMap<>();
@@ -119,6 +120,10 @@ class Connection extends Target {
maxOutputSize = bytes;
}
+ public void setDropEmptyBuffers(boolean value) {
+ dropEmptyBuffers = value;
+ }
+
public TransportThread transportThread() {
return parent;
}
@@ -307,6 +312,10 @@ class Connection extends Target {
while (socket.drain(input.getChannelWritable(readSize)) > 0) {
handlePackets();
}
+ if (dropEmptyBuffers) {
+ socket.dropEmptyBuffers();
+ input.shrink(0);
+ }
if (maxInputSize > 0) {
input.shrink(maxInputSize);
}
@@ -363,6 +372,10 @@ class Connection extends Target {
if (disableWrite) {
disableWrite();
}
+ if (dropEmptyBuffers) {
+ socket.dropEmptyBuffers();
+ output.shrink(0);
+ }
if (maxOutputSize > 0) {
output.shrink(maxOutputSize);
}
diff --git a/jrt/src/com/yahoo/jrt/CryptoSocket.java b/jrt/src/com/yahoo/jrt/CryptoSocket.java
index e0489dec7a3..b7de30f9236 100644
--- a/jrt/src/com/yahoo/jrt/CryptoSocket.java
+++ b/jrt/src/com/yahoo/jrt/CryptoSocket.java
@@ -96,6 +96,13 @@ public interface CryptoSocket {
public FlushResult flush() throws IOException;
/**
+ * This function can be called at any time to drop any currently
+ * empty internal buffers. Typically called after drain or flush
+ * indicates that no further progress can be made.
+ **/
+ public void dropEmptyBuffers();
+
+ /**
* Returns the security context for the current connection (given handshake completed),
* or empty if the current connection is not secure.
*/
diff --git a/jrt/src/com/yahoo/jrt/MaybeTlsCryptoSocket.java b/jrt/src/com/yahoo/jrt/MaybeTlsCryptoSocket.java
index 60b7f342c9c..d84b33143f3 100644
--- a/jrt/src/com/yahoo/jrt/MaybeTlsCryptoSocket.java
+++ b/jrt/src/com/yahoo/jrt/MaybeTlsCryptoSocket.java
@@ -129,5 +129,6 @@ public class MaybeTlsCryptoSocket implements CryptoSocket {
@Override public int drain(ByteBuffer dst) throws IOException { return socket.drain(dst); }
@Override public int write(ByteBuffer src) throws IOException { return socket.write(src); }
@Override public FlushResult flush() throws IOException { return socket.flush(); }
+ @Override public void dropEmptyBuffers() { socket.dropEmptyBuffers(); }
@Override public Optional<SecurityContext> getSecurityContext() { return Optional.ofNullable(socket).flatMap(CryptoSocket::getSecurityContext); }
}
diff --git a/jrt/src/com/yahoo/jrt/NullCryptoSocket.java b/jrt/src/com/yahoo/jrt/NullCryptoSocket.java
index 83359bb65a5..81995d69a40 100644
--- a/jrt/src/com/yahoo/jrt/NullCryptoSocket.java
+++ b/jrt/src/com/yahoo/jrt/NullCryptoSocket.java
@@ -30,4 +30,5 @@ public class NullCryptoSocket implements CryptoSocket {
@Override public int drain(ByteBuffer dst) throws IOException { return 0; }
@Override public int write(ByteBuffer src) throws IOException { return channel.write(src); }
@Override public FlushResult flush() throws IOException { return FlushResult.DONE; }
+ @Override public void dropEmptyBuffers() {}
}
diff --git a/jrt/src/com/yahoo/jrt/Supervisor.java b/jrt/src/com/yahoo/jrt/Supervisor.java
index bcd525c9596..d7c2c83ea69 100644
--- a/jrt/src/com/yahoo/jrt/Supervisor.java
+++ b/jrt/src/com/yahoo/jrt/Supervisor.java
@@ -23,6 +23,7 @@ public class Supervisor {
private final AtomicReference<HashMap<String, Method>> methodMap = new AtomicReference<>(new HashMap<>());
private int maxInputBufferSize = 0;
private int maxOutputBufferSize = 0;
+ private boolean dropEmptyBuffers = false;
/**
* Create a new Supervisor based on the given {@link Transport}
@@ -46,6 +47,18 @@ public class Supervisor {
}
/**
+ * Drop empty buffers. This will reduce memory footprint for idle
+ * connections at the cost of extra allocations when buffer space
+ * is needed again.
+ *
+ * @param value true means drop empty buffers
+ **/
+ public Supervisor setDropEmptyBuffers(boolean value) {
+ dropEmptyBuffers = value;
+ return this;
+ }
+
+ /**
* Set maximum input buffer size. This value will only affect
* connections that use a common input buffer when decoding
* incoming packets. Note that this value is not an absolute
@@ -193,6 +206,7 @@ public class Supervisor {
Connection conn = (Connection) target;
conn.setMaxInputSize(maxInputBufferSize);
conn.setMaxOutputSize(maxOutputBufferSize);
+ conn.setDropEmptyBuffers(dropEmptyBuffers);
}
SessionHandler handler = sessionHandler;
if (handler != null) {
diff --git a/jrt/src/com/yahoo/jrt/TlsCryptoSocket.java b/jrt/src/com/yahoo/jrt/TlsCryptoSocket.java
index f140bb38c66..7ba83d6718e 100644
--- a/jrt/src/com/yahoo/jrt/TlsCryptoSocket.java
+++ b/jrt/src/com/yahoo/jrt/TlsCryptoSocket.java
@@ -216,6 +216,11 @@ public class TlsCryptoSocket implements CryptoSocket {
return wrapBuffer.bytes() > 0 ? FlushResult.NEED_WRITE : FlushResult.DONE;
}
+ @Override public void dropEmptyBuffers() {
+ wrapBuffer.shrink(0);
+ unwrapBuffer.shrink(0);
+ }
+
@Override
public Optional<SecurityContext> getSecurityContext() {
try {
diff --git a/jrt/src/com/yahoo/jrt/XorCryptoSocket.java b/jrt/src/com/yahoo/jrt/XorCryptoSocket.java
index 55f7b18c661..7477cd5816d 100644
--- a/jrt/src/com/yahoo/jrt/XorCryptoSocket.java
+++ b/jrt/src/com/yahoo/jrt/XorCryptoSocket.java
@@ -119,5 +119,8 @@ public class XorCryptoSocket implements CryptoSocket {
return FlushResult.DONE;
}
}
-
+ @Override public void dropEmptyBuffers() {
+ input.shrink(0);
+ output.shrink(0);
+ }
}
diff --git a/jrt/tests/com/yahoo/jrt/LatencyTest.java b/jrt/tests/com/yahoo/jrt/LatencyTest.java
index c8ead8ebf77..1c736fb28ea 100644
--- a/jrt/tests/com/yahoo/jrt/LatencyTest.java
+++ b/jrt/tests/com/yahoo/jrt/LatencyTest.java
@@ -17,12 +17,15 @@ public class LatencyTest {
private final Supervisor server;
private final Supervisor client;
private final Acceptor acceptor;
- public Network(CryptoEngine crypto, int threads) throws ListenFailedException {
+ public Network(CryptoEngine crypto, int threads, boolean dropEmpty) throws ListenFailedException {
server = new Supervisor(new Transport("server", crypto, threads));
client = new Supervisor(new Transport("client", crypto, threads));
+ server.setDropEmptyBuffers(dropEmpty);
+ client.setDropEmptyBuffers(dropEmpty);
server.addMethod(new Method("inc", "i", "i", this::rpc_inc));
acceptor = server.listen(new Spec(0));
}
+ public Network(CryptoEngine crypto, int threads) throws ListenFailedException { this(crypto, threads, false); }
public Target connect() {
return client.connect(new Spec("localhost", acceptor.port()));
}
@@ -188,6 +191,13 @@ public class LatencyTest {
}
@org.junit.Test
+ public void testTlsCryptoWithDropEmptyBuffersLatency() throws Throwable {
+ try (Network network = new Network(new TlsCryptoEngine(createTestTlsContext()), 1, true)) {
+ new Client(false, network, 1).measureLatency("[tls crypto, drop empty, no reconnect] ");
+ }
+ }
+
+ @org.junit.Test
public void testTransportThreadScaling() throws Throwable {
try (Network network = new Network(new NullCryptoEngine(), 1)) {
new Client(false, network, 64).measureLatency("[64 clients, 1/1 transport] ");