summaryrefslogtreecommitdiffstats
path: root/jrt
diff options
context:
space:
mode:
authorHåvard Pettersen <havardpe@gmail.com>2019-02-25 15:35:20 +0100
committerHåvard Pettersen <havardpe@gmail.com>2019-02-26 15:44:43 +0100
commitd5f72c654b4a45829df84dcb7a311d786c2575fa (patch)
treeb3eda4e5137412b2c5605a6df437d023429b9f36 /jrt
parentd3c80d67c20b23b3cff0ed49f0f6fa57bce703df (diff)
async tls handshake in jrt
Diffstat (limited to 'jrt')
-rw-r--r--jrt/src/com/yahoo/jrt/Closer.java54
-rw-r--r--jrt/src/com/yahoo/jrt/Connection.java30
-rw-r--r--jrt/src/com/yahoo/jrt/ThreadQueue.java18
-rw-r--r--jrt/src/com/yahoo/jrt/Transport.java42
-rw-r--r--jrt/src/com/yahoo/jrt/Worker.java85
-rw-r--r--jrt/tests/com/yahoo/jrt/LatencyTest.java84
6 files changed, 248 insertions, 65 deletions
diff --git a/jrt/src/com/yahoo/jrt/Closer.java b/jrt/src/com/yahoo/jrt/Closer.java
deleted file mode 100644
index 71d99807253..00000000000
--- a/jrt/src/com/yahoo/jrt/Closer.java
+++ /dev/null
@@ -1,54 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.jrt;
-
-
-class Closer {
-
- private class Run implements Runnable {
- public void run() {
- try {
- Closer.this.run();
- } catch (Throwable problem) {
- parent.handleFailure(problem, Closer.this);
- }
- }
- }
-
- private Thread thread = new Thread(new Run(), "<jrt-closer>");
- private Transport parent;
- private ThreadQueue closeQueue = new ThreadQueue();
-
- public Closer(Transport parent) {
- this.parent = parent;
- thread.setDaemon(true);
- thread.start();
- }
-
- public void closeLater(Connection c) {
- if (!closeQueue.enqueue(c)) {
- c.closeSocket();
- }
- }
-
- private void run() {
- try {
- while (true) {
- ((Connection)closeQueue.dequeue()).closeSocket();
- }
- } catch (EndOfQueueException e) {}
- }
-
- public Closer shutdown() {
- closeQueue.close();
- return this;
- }
-
- public void join() {
- while (true) {
- try {
- thread.join();
- return;
- } catch (InterruptedException e) {}
- }
- }
-}
diff --git a/jrt/src/com/yahoo/jrt/Connection.java b/jrt/src/com/yahoo/jrt/Connection.java
index df7acf881df..36e1f1c8a8e 100644
--- a/jrt/src/com/yahoo/jrt/Connection.java
+++ b/jrt/src/com/yahoo/jrt/Connection.java
@@ -41,6 +41,7 @@ class Connection extends Target {
private Map<TargetWatcher, TargetWatcher> watchers = new IdentityHashMap<>();
private int activeReqs = 0;
private int writeWork = 0;
+ private boolean pendingHandshakeWork = false;
private Transport parent;
private Supervisor owner;
private Spec spec;
@@ -217,11 +218,10 @@ class Connection extends Target {
}
private void handshake() throws IOException {
- HandshakeResult result;
- while ((result = socket.handshake()) == HandshakeResult.NEED_WORK) {
- socket.doHandshakeWork();
+ if (pendingHandshakeWork) {
+ return;
}
- switch (result) {
+ switch (socket.handshake()) {
case DONE:
if (socket.getMinimumReadBufferSize() > readSize) {
readSize = socket.getMinimumReadBufferSize();
@@ -239,6 +239,28 @@ class Connection extends Target {
disableRead();
enableWrite();
break;
+ case NEED_WORK:
+ disableRead();
+ disableWrite();
+ pendingHandshakeWork = true;
+ parent.doHandshakeWork(this);
+ break;
+ }
+ }
+
+ public void doHandshakeWork() {
+ socket.doHandshakeWork();
+ }
+
+ public void handleHandshakeWorkDone() throws IOException {
+ if (!pendingHandshakeWork) {
+ throw new IllegalStateException("jrt: got unwanted handshake work done event");
+ }
+ pendingHandshakeWork = false;
+ if (state == CONNECTING) {
+ handshake();
+ } else {
+ throw new IOException("jrt: got handshake work done event in incompatible state: " + state);
}
}
diff --git a/jrt/src/com/yahoo/jrt/ThreadQueue.java b/jrt/src/com/yahoo/jrt/ThreadQueue.java
index 119008d835d..b758a30214a 100644
--- a/jrt/src/com/yahoo/jrt/ThreadQueue.java
+++ b/jrt/src/com/yahoo/jrt/ThreadQueue.java
@@ -21,8 +21,22 @@ class ThreadQueue
* was closed
* @param obj the object to enqueue
**/
- public synchronized boolean enqueue(Object obj) {
- if (closed) {
+ public boolean enqueue(Object obj) {
+ return enqueue(obj, Integer.MAX_VALUE);
+ }
+
+ /**
+ * Enqueue an object on this queue. If the queue has been closed or
+ * the queue already contains too many items, the object will not be
+ * queued, and this method will return false.
+ *
+ * @return true if the object was enqueued, false if this queue
+ * was closed or too large
+ * @param obj the object to enqueue
+ * @param limit more elements than this means the queue is too large
+ **/
+ public synchronized boolean enqueue(Object obj, int limit) {
+ if (closed || (queue.size() > limit)) {
return false;
}
queue.enqueue(obj);
diff --git a/jrt/src/com/yahoo/jrt/Transport.java b/jrt/src/com/yahoo/jrt/Transport.java
index 0a2f2a4b7cb..33ce6fe6ed0 100644
--- a/jrt/src/com/yahoo/jrt/Transport.java
+++ b/jrt/src/com/yahoo/jrt/Transport.java
@@ -52,6 +52,12 @@ public class Transport {
public void run() { handleEnableWrite(conn); }
}
+ private class HandshakeWorkDoneCmd implements Runnable {
+ private Connection conn;
+ HandshakeWorkDoneCmd(Connection conn) { this.conn = conn; }
+ public void run() { handleHandshakeWorkDone(conn); }
+ }
+
private class SyncCmd implements Runnable {
boolean done = false;
public synchronized void waitDone() {
@@ -73,7 +79,7 @@ public class Transport {
private Queue queue;
private Queue myQueue;
private Connector connector;
- private Closer closer;
+ private Worker worker;
private Scheduler scheduler;
private int state;
private Selector selector;
@@ -82,7 +88,7 @@ public class Transport {
private void handleAddConnection(Connection conn) {
if (conn.isClosed()) {
if (conn.hasSocket()) {
- closer.closeLater(conn);
+ worker.closeLater(conn);
}
return;
}
@@ -97,7 +103,7 @@ public class Transport {
}
conn.fini();
if (conn.hasSocket()) {
- closer.closeLater(conn);
+ worker.closeLater(conn);
}
}
@@ -108,6 +114,18 @@ public class Transport {
conn.enableWrite();
}
+ private void handleHandshakeWorkDone(Connection conn) {
+ if (conn.isClosed()) {
+ return;
+ }
+ try {
+ conn.handleHandshakeWorkDone();
+ } catch (IOException e) {
+ conn.setLostReason(e);
+ handleCloseConnection(conn);
+ }
+ }
+
private boolean postCommand(Runnable cmd) {
boolean wakeup;
synchronized (this) {
@@ -174,7 +192,7 @@ public class Transport {
queue = new Queue();
myQueue = new Queue();
connector = new Connector(this);
- closer = new Closer(this);
+ worker = new Worker(this);
scheduler = new Scheduler(System.currentTimeMillis());
state = OPEN;
try {
@@ -287,6 +305,20 @@ public class Transport {
}
}
+ void handshakeWorkDone(Connection conn) {
+ postCommand(new HandshakeWorkDoneCmd(conn));
+ }
+
+ /**
+ * Request that {@link Connection#doHandshakeWork()} be called (in any thread)
+ * followed by a call to {@link Connection#handleHandshakeWorkDone()} from the transport thread.
+ *
+ * @param conn the connection needing handshake work
+ */
+ void doHandshakeWork(Connection conn) {
+ worker.doHandshakeWork(conn);
+ }
+
/**
* Create a {@link Task} that can be scheduled for execution in
* the transport thread.
@@ -379,7 +411,7 @@ public class Transport {
handleCloseConnection(conn);
}
try { selector.close(); } catch (Exception e) {}
- closer.shutdown().join();
+ worker.shutdown().join();
connector.exit().join();
try { cryptoEngine.close(); } catch (Exception e) {}
}
diff --git a/jrt/src/com/yahoo/jrt/Worker.java b/jrt/src/com/yahoo/jrt/Worker.java
new file mode 100644
index 00000000000..41aeaa9b4ed
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/Worker.java
@@ -0,0 +1,85 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+class Worker {
+
+ private static int WORK_LIMIT = 1024;
+
+ private class Run implements Runnable {
+ public void run() {
+ try {
+ Worker.this.run();
+ } catch (Throwable problem) {
+ parent.handleFailure(problem, Worker.this);
+ }
+ }
+ }
+
+ private static class CloseSocket implements Runnable {
+ Connection connection;
+ CloseSocket(Connection c) {
+ connection = c;
+ }
+ public void run() {
+ connection.closeSocket();
+ }
+ }
+
+ private static class DoHandshakeWork implements Runnable {
+ private Connection connection;
+ DoHandshakeWork(Connection c) {
+ connection = c;
+ }
+ public void run() {
+ connection.doHandshakeWork();
+ connection.transport().handshakeWorkDone(connection);
+ }
+ }
+
+ private Thread thread = new Thread(new Run(), "<jrt-worker>");
+ private Transport parent;
+ private ThreadQueue workQueue = new ThreadQueue();
+
+ public Worker(Transport parent) {
+ this.parent = parent;
+ thread.setDaemon(true);
+ thread.start();
+ }
+
+ private void doLater(Runnable r) {
+ if(!workQueue.enqueue(r, WORK_LIMIT)) {
+ r.run();
+ }
+ }
+
+ public void closeLater(Connection c) {
+ doLater(new CloseSocket(c));
+ }
+
+ public void doHandshakeWork(Connection c) {
+ doLater(new DoHandshakeWork(c));
+ }
+
+ private void run() {
+ try {
+ while (true) {
+ ((Runnable) workQueue.dequeue()).run();
+ }
+ } catch (EndOfQueueException e) {}
+ }
+
+ public Worker shutdown() {
+ workQueue.close();
+ return this;
+ }
+
+ public void join() {
+ while (true) {
+ try {
+ thread.join();
+ return;
+ } catch (InterruptedException e) {}
+ }
+ }
+}
diff --git a/jrt/tests/com/yahoo/jrt/LatencyTest.java b/jrt/tests/com/yahoo/jrt/LatencyTest.java
new file mode 100644
index 00000000000..a1f71bda013
--- /dev/null
+++ b/jrt/tests/com/yahoo/jrt/LatencyTest.java
@@ -0,0 +1,84 @@
+// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+package com.yahoo.jrt;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.logging.Logger;
+
+import static com.yahoo.jrt.CryptoUtils.createTestTlsContext;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class LatencyTest {
+ private static final Logger log = Logger.getLogger(LatencyTest.class.getName());
+
+ private static class Server implements AutoCloseable {
+ private Supervisor orb;
+ private Acceptor acceptor;
+ public Server(CryptoEngine crypto) throws ListenFailedException {
+ orb = new Supervisor(new Transport(crypto));
+ acceptor = orb.listen(new Spec(0));
+ orb.addMethod(new Method("inc", "i", "i", this, "rpc_inc"));
+ }
+ public Target connect() {
+ return orb.connect(new Spec("localhost", acceptor.port()));
+ }
+ public void rpc_inc(Request req) {
+ req.returnValues().add(new Int32Value(req.parameters().get(0).asInt32() + 1));
+ }
+ public void close() {
+ acceptor.shutdown().join();
+ orb.transport().shutdown().join();
+ }
+ }
+
+ private void measureLatency(String prefix, Server server, boolean reconnect) {
+ int value = 100;
+ List<Double> list = new ArrayList<>();
+ Target target = server.connect();
+ for (int i = 0; i < 64; ++i) {
+ long before = System.nanoTime();
+ if (reconnect) {
+ target.close();
+ target = server.connect();
+ }
+ Request req = new Request("inc");
+ req.parameters().add(new Int32Value(value));
+ target.invokeSync(req, 60.0);
+ assertTrue(req.checkReturnTypes("i"));
+ assertEquals(value + 1, req.returnValues().get(0).asInt32());
+ value++;
+ long duration = System.nanoTime() - before;
+ list.add(duration / 1000000.0);
+ }
+ target.close();
+ Collections.sort(list);
+ log.info(prefix + "invocation latency: " + list.get(list.size() / 2) + " ms");
+ }
+
+ @org.junit.Test
+ public void testNullCryptoLatency() throws ListenFailedException {
+ try (Server server = new Server(new NullCryptoEngine())) {
+ measureLatency("[null crypto, no reconnect] ", server, false);
+ measureLatency("[null crypto, reconnect] ", server, true);
+ }
+ }
+
+ @org.junit.Test
+ public void testXorCryptoLatency() throws ListenFailedException {
+ try (Server server = new Server(new XorCryptoEngine())) {
+ measureLatency("[xor crypto, no reconnect] ", server, false);
+ measureLatency("[xor crypto, reconnect] ", server, true);
+ }
+ }
+
+ @org.junit.Test
+ public void testTlsCryptoLatency() throws ListenFailedException {
+ try (Server server = new Server(new TlsCryptoEngine(createTestTlsContext()))) {
+ measureLatency("[tls crypto, no reconnect] ", server, false);
+ measureLatency("[tls crypto, reconnect] ", server, true);
+ }
+ }
+}