From 19b3abbef47ccc08665af54dc8afcef2e49027a5 Mon Sep 17 00:00:00 2001 From: HÃ¥vard Pettersen Date: Mon, 25 Feb 2019 15:35:20 +0100 Subject: async tls handshake in jrt --- jrt/src/com/yahoo/jrt/Closer.java | 54 -------------------- jrt/src/com/yahoo/jrt/Connection.java | 30 +++++++++-- jrt/src/com/yahoo/jrt/ThreadQueue.java | 18 ++++++- jrt/src/com/yahoo/jrt/Transport.java | 42 ++++++++++++++-- jrt/src/com/yahoo/jrt/Worker.java | 85 ++++++++++++++++++++++++++++++++ jrt/tests/com/yahoo/jrt/LatencyTest.java | 84 +++++++++++++++++++++++++++++++ 6 files changed, 248 insertions(+), 65 deletions(-) delete mode 100644 jrt/src/com/yahoo/jrt/Closer.java create mode 100644 jrt/src/com/yahoo/jrt/Worker.java create mode 100644 jrt/tests/com/yahoo/jrt/LatencyTest.java diff --git a/jrt/src/com/yahoo/jrt/Closer.java b/jrt/src/com/yahoo/jrt/Closer.java deleted file mode 100644 index 71d99807253..00000000000 --- a/jrt/src/com/yahoo/jrt/Closer.java +++ /dev/null @@ -1,54 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.jrt; - - -class Closer { - - private class Run implements Runnable { - public void run() { - try { - Closer.this.run(); - } catch (Throwable problem) { - parent.handleFailure(problem, Closer.this); - } - } - } - - private Thread thread = new Thread(new Run(), ""); - private Transport parent; - private ThreadQueue closeQueue = new ThreadQueue(); - - public Closer(Transport parent) { - this.parent = parent; - thread.setDaemon(true); - thread.start(); - } - - public void closeLater(Connection c) { - if (!closeQueue.enqueue(c)) { - c.closeSocket(); - } - } - - private void run() { - try { - while (true) { - ((Connection)closeQueue.dequeue()).closeSocket(); - } - } catch (EndOfQueueException e) {} - } - - public Closer shutdown() { - closeQueue.close(); - return this; - } - - public void join() { - while (true) { - try { - thread.join(); - return; - } catch (InterruptedException e) {} - } - } -} diff --git a/jrt/src/com/yahoo/jrt/Connection.java b/jrt/src/com/yahoo/jrt/Connection.java index df7acf881df..36e1f1c8a8e 100644 --- a/jrt/src/com/yahoo/jrt/Connection.java +++ b/jrt/src/com/yahoo/jrt/Connection.java @@ -41,6 +41,7 @@ class Connection extends Target { private Map watchers = new IdentityHashMap<>(); private int activeReqs = 0; private int writeWork = 0; + private boolean pendingHandshakeWork = false; private Transport parent; private Supervisor owner; private Spec spec; @@ -217,11 +218,10 @@ class Connection extends Target { } private void handshake() throws IOException { - HandshakeResult result; - while ((result = socket.handshake()) == HandshakeResult.NEED_WORK) { - socket.doHandshakeWork(); + if (pendingHandshakeWork) { + return; } - switch (result) { + switch (socket.handshake()) { case DONE: if (socket.getMinimumReadBufferSize() > readSize) { readSize = socket.getMinimumReadBufferSize(); @@ -239,6 +239,28 @@ class Connection extends Target { disableRead(); enableWrite(); break; + case NEED_WORK: + disableRead(); + disableWrite(); + pendingHandshakeWork = true; + parent.doHandshakeWork(this); + break; + } + } + + public void doHandshakeWork() { + socket.doHandshakeWork(); + } + + public void handleHandshakeWorkDone() throws IOException { + if (!pendingHandshakeWork) { + throw new IllegalStateException("jrt: got unwanted handshake work done event"); + } + pendingHandshakeWork = false; + if (state == CONNECTING) { + handshake(); + } else { + throw new IOException("jrt: got handshake work done event in incompatible state: " + state); } } diff --git a/jrt/src/com/yahoo/jrt/ThreadQueue.java b/jrt/src/com/yahoo/jrt/ThreadQueue.java index 119008d835d..b758a30214a 100644 --- a/jrt/src/com/yahoo/jrt/ThreadQueue.java +++ b/jrt/src/com/yahoo/jrt/ThreadQueue.java @@ -21,8 +21,22 @@ class ThreadQueue * was closed * @param obj the object to enqueue **/ - public synchronized boolean enqueue(Object obj) { - if (closed) { + public boolean enqueue(Object obj) { + return enqueue(obj, Integer.MAX_VALUE); + } + + /** + * Enqueue an object on this queue. If the queue has been closed or + * the queue already contains too many items, the object will not be + * queued, and this method will return false. + * + * @return true if the object was enqueued, false if this queue + * was closed or too large + * @param obj the object to enqueue + * @param limit more elements than this means the queue is too large + **/ + public synchronized boolean enqueue(Object obj, int limit) { + if (closed || (queue.size() > limit)) { return false; } queue.enqueue(obj); diff --git a/jrt/src/com/yahoo/jrt/Transport.java b/jrt/src/com/yahoo/jrt/Transport.java index 0a2f2a4b7cb..33ce6fe6ed0 100644 --- a/jrt/src/com/yahoo/jrt/Transport.java +++ b/jrt/src/com/yahoo/jrt/Transport.java @@ -52,6 +52,12 @@ public class Transport { public void run() { handleEnableWrite(conn); } } + private class HandshakeWorkDoneCmd implements Runnable { + private Connection conn; + HandshakeWorkDoneCmd(Connection conn) { this.conn = conn; } + public void run() { handleHandshakeWorkDone(conn); } + } + private class SyncCmd implements Runnable { boolean done = false; public synchronized void waitDone() { @@ -73,7 +79,7 @@ public class Transport { private Queue queue; private Queue myQueue; private Connector connector; - private Closer closer; + private Worker worker; private Scheduler scheduler; private int state; private Selector selector; @@ -82,7 +88,7 @@ public class Transport { private void handleAddConnection(Connection conn) { if (conn.isClosed()) { if (conn.hasSocket()) { - closer.closeLater(conn); + worker.closeLater(conn); } return; } @@ -97,7 +103,7 @@ public class Transport { } conn.fini(); if (conn.hasSocket()) { - closer.closeLater(conn); + worker.closeLater(conn); } } @@ -108,6 +114,18 @@ public class Transport { conn.enableWrite(); } + private void handleHandshakeWorkDone(Connection conn) { + if (conn.isClosed()) { + return; + } + try { + conn.handleHandshakeWorkDone(); + } catch (IOException e) { + conn.setLostReason(e); + handleCloseConnection(conn); + } + } + private boolean postCommand(Runnable cmd) { boolean wakeup; synchronized (this) { @@ -174,7 +192,7 @@ public class Transport { queue = new Queue(); myQueue = new Queue(); connector = new Connector(this); - closer = new Closer(this); + worker = new Worker(this); scheduler = new Scheduler(System.currentTimeMillis()); state = OPEN; try { @@ -287,6 +305,20 @@ public class Transport { } } + void handshakeWorkDone(Connection conn) { + postCommand(new HandshakeWorkDoneCmd(conn)); + } + + /** + * Request that {@link Connection#doHandshakeWork()} be called (in any thread) + * followed by a call to {@link Connection#handleHandshakeWorkDone()} from the transport thread. + * + * @param conn the connection needing handshake work + */ + void doHandshakeWork(Connection conn) { + worker.doHandshakeWork(conn); + } + /** * Create a {@link Task} that can be scheduled for execution in * the transport thread. @@ -379,7 +411,7 @@ public class Transport { handleCloseConnection(conn); } try { selector.close(); } catch (Exception e) {} - closer.shutdown().join(); + worker.shutdown().join(); connector.exit().join(); try { cryptoEngine.close(); } catch (Exception e) {} } diff --git a/jrt/src/com/yahoo/jrt/Worker.java b/jrt/src/com/yahoo/jrt/Worker.java new file mode 100644 index 00000000000..41aeaa9b4ed --- /dev/null +++ b/jrt/src/com/yahoo/jrt/Worker.java @@ -0,0 +1,85 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +class Worker { + + private static int WORK_LIMIT = 1024; + + private class Run implements Runnable { + public void run() { + try { + Worker.this.run(); + } catch (Throwable problem) { + parent.handleFailure(problem, Worker.this); + } + } + } + + private static class CloseSocket implements Runnable { + Connection connection; + CloseSocket(Connection c) { + connection = c; + } + public void run() { + connection.closeSocket(); + } + } + + private static class DoHandshakeWork implements Runnable { + private Connection connection; + DoHandshakeWork(Connection c) { + connection = c; + } + public void run() { + connection.doHandshakeWork(); + connection.transport().handshakeWorkDone(connection); + } + } + + private Thread thread = new Thread(new Run(), ""); + private Transport parent; + private ThreadQueue workQueue = new ThreadQueue(); + + public Worker(Transport parent) { + this.parent = parent; + thread.setDaemon(true); + thread.start(); + } + + private void doLater(Runnable r) { + if(!workQueue.enqueue(r, WORK_LIMIT)) { + r.run(); + } + } + + public void closeLater(Connection c) { + doLater(new CloseSocket(c)); + } + + public void doHandshakeWork(Connection c) { + doLater(new DoHandshakeWork(c)); + } + + private void run() { + try { + while (true) { + ((Runnable) workQueue.dequeue()).run(); + } + } catch (EndOfQueueException e) {} + } + + public Worker shutdown() { + workQueue.close(); + return this; + } + + public void join() { + while (true) { + try { + thread.join(); + return; + } catch (InterruptedException e) {} + } + } +} diff --git a/jrt/tests/com/yahoo/jrt/LatencyTest.java b/jrt/tests/com/yahoo/jrt/LatencyTest.java new file mode 100644 index 00000000000..a1f71bda013 --- /dev/null +++ b/jrt/tests/com/yahoo/jrt/LatencyTest.java @@ -0,0 +1,84 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +package com.yahoo.jrt; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.logging.Logger; + +import static com.yahoo.jrt.CryptoUtils.createTestTlsContext; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class LatencyTest { + private static final Logger log = Logger.getLogger(LatencyTest.class.getName()); + + private static class Server implements AutoCloseable { + private Supervisor orb; + private Acceptor acceptor; + public Server(CryptoEngine crypto) throws ListenFailedException { + orb = new Supervisor(new Transport(crypto)); + acceptor = orb.listen(new Spec(0)); + orb.addMethod(new Method("inc", "i", "i", this, "rpc_inc")); + } + public Target connect() { + return orb.connect(new Spec("localhost", acceptor.port())); + } + public void rpc_inc(Request req) { + req.returnValues().add(new Int32Value(req.parameters().get(0).asInt32() + 1)); + } + public void close() { + acceptor.shutdown().join(); + orb.transport().shutdown().join(); + } + } + + private void measureLatency(String prefix, Server server, boolean reconnect) { + int value = 100; + List list = new ArrayList<>(); + Target target = server.connect(); + for (int i = 0; i < 64; ++i) { + long before = System.nanoTime(); + if (reconnect) { + target.close(); + target = server.connect(); + } + Request req = new Request("inc"); + req.parameters().add(new Int32Value(value)); + target.invokeSync(req, 60.0); + assertTrue(req.checkReturnTypes("i")); + assertEquals(value + 1, req.returnValues().get(0).asInt32()); + value++; + long duration = System.nanoTime() - before; + list.add(duration / 1000000.0); + } + target.close(); + Collections.sort(list); + log.info(prefix + "invocation latency: " + list.get(list.size() / 2) + " ms"); + } + + @org.junit.Test + public void testNullCryptoLatency() throws ListenFailedException { + try (Server server = new Server(new NullCryptoEngine())) { + measureLatency("[null crypto, no reconnect] ", server, false); + measureLatency("[null crypto, reconnect] ", server, true); + } + } + + @org.junit.Test + public void testXorCryptoLatency() throws ListenFailedException { + try (Server server = new Server(new XorCryptoEngine())) { + measureLatency("[xor crypto, no reconnect] ", server, false); + measureLatency("[xor crypto, reconnect] ", server, true); + } + } + + @org.junit.Test + public void testTlsCryptoLatency() throws ListenFailedException { + try (Server server = new Server(new TlsCryptoEngine(createTestTlsContext()))) { + measureLatency("[tls crypto, no reconnect] ", server, false); + measureLatency("[tls crypto, reconnect] ", server, true); + } + } +} -- cgit v1.2.3