From af26e7921e852ba6335a62bcf160339adfa195aa Mon Sep 17 00:00:00 2001 From: Harald Musum Date: Wed, 27 Feb 2019 19:30:52 +0100 Subject: Revert "async tls handshake in jrt" --- jrt/src/com/yahoo/jrt/Closer.java | 54 ++++++++++++++++++++ jrt/src/com/yahoo/jrt/Connection.java | 30 ++--------- jrt/src/com/yahoo/jrt/ThreadQueue.java | 18 +------ jrt/src/com/yahoo/jrt/Transport.java | 42 ++-------------- jrt/src/com/yahoo/jrt/Worker.java | 85 -------------------------------- jrt/tests/com/yahoo/jrt/LatencyTest.java | 84 ------------------------------- 6 files changed, 65 insertions(+), 248 deletions(-) create mode 100644 jrt/src/com/yahoo/jrt/Closer.java delete mode 100644 jrt/src/com/yahoo/jrt/Worker.java delete mode 100644 jrt/tests/com/yahoo/jrt/LatencyTest.java diff --git a/jrt/src/com/yahoo/jrt/Closer.java b/jrt/src/com/yahoo/jrt/Closer.java new file mode 100644 index 00000000000..71d99807253 --- /dev/null +++ b/jrt/src/com/yahoo/jrt/Closer.java @@ -0,0 +1,54 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +class Closer { + + private class Run implements Runnable { + public void run() { + try { + Closer.this.run(); + } catch (Throwable problem) { + parent.handleFailure(problem, Closer.this); + } + } + } + + private Thread thread = new Thread(new Run(), ""); + private Transport parent; + private ThreadQueue closeQueue = new ThreadQueue(); + + public Closer(Transport parent) { + this.parent = parent; + thread.setDaemon(true); + thread.start(); + } + + public void closeLater(Connection c) { + if (!closeQueue.enqueue(c)) { + c.closeSocket(); + } + } + + private void run() { + try { + while (true) { + ((Connection)closeQueue.dequeue()).closeSocket(); + } + } catch (EndOfQueueException e) {} + } + + public Closer shutdown() { + closeQueue.close(); + return this; + } + + public void join() { + while (true) { + try { + thread.join(); + return; + } catch (InterruptedException e) {} + } + } +} diff --git a/jrt/src/com/yahoo/jrt/Connection.java b/jrt/src/com/yahoo/jrt/Connection.java index 36e1f1c8a8e..df7acf881df 100644 --- a/jrt/src/com/yahoo/jrt/Connection.java +++ b/jrt/src/com/yahoo/jrt/Connection.java @@ -41,7 +41,6 @@ class Connection extends Target { private Map watchers = new IdentityHashMap<>(); private int activeReqs = 0; private int writeWork = 0; - private boolean pendingHandshakeWork = false; private Transport parent; private Supervisor owner; private Spec spec; @@ -218,10 +217,11 @@ class Connection extends Target { } private void handshake() throws IOException { - if (pendingHandshakeWork) { - return; + HandshakeResult result; + while ((result = socket.handshake()) == HandshakeResult.NEED_WORK) { + socket.doHandshakeWork(); } - switch (socket.handshake()) { + switch (result) { case DONE: if (socket.getMinimumReadBufferSize() > readSize) { readSize = socket.getMinimumReadBufferSize(); @@ -239,28 +239,6 @@ class Connection extends Target { disableRead(); enableWrite(); break; - case NEED_WORK: - disableRead(); - disableWrite(); - pendingHandshakeWork = true; - parent.doHandshakeWork(this); - break; - } - } - - public void doHandshakeWork() { - socket.doHandshakeWork(); - } - - public void handleHandshakeWorkDone() throws IOException { - if (!pendingHandshakeWork) { - throw new IllegalStateException("jrt: got unwanted handshake work done event"); - } - pendingHandshakeWork = false; - if (state == CONNECTING) { - handshake(); - } else { - throw new IOException("jrt: got handshake work done event in incompatible state: " + state); } } diff --git a/jrt/src/com/yahoo/jrt/ThreadQueue.java b/jrt/src/com/yahoo/jrt/ThreadQueue.java index b758a30214a..119008d835d 100644 --- a/jrt/src/com/yahoo/jrt/ThreadQueue.java +++ b/jrt/src/com/yahoo/jrt/ThreadQueue.java @@ -21,22 +21,8 @@ class ThreadQueue * was closed * @param obj the object to enqueue **/ - public boolean enqueue(Object obj) { - return enqueue(obj, Integer.MAX_VALUE); - } - - /** - * Enqueue an object on this queue. If the queue has been closed or - * the queue already contains too many items, the object will not be - * queued, and this method will return false. - * - * @return true if the object was enqueued, false if this queue - * was closed or too large - * @param obj the object to enqueue - * @param limit more elements than this means the queue is too large - **/ - public synchronized boolean enqueue(Object obj, int limit) { - if (closed || (queue.size() > limit)) { + public synchronized boolean enqueue(Object obj) { + if (closed) { return false; } queue.enqueue(obj); diff --git a/jrt/src/com/yahoo/jrt/Transport.java b/jrt/src/com/yahoo/jrt/Transport.java index 33ce6fe6ed0..0a2f2a4b7cb 100644 --- a/jrt/src/com/yahoo/jrt/Transport.java +++ b/jrt/src/com/yahoo/jrt/Transport.java @@ -52,12 +52,6 @@ public class Transport { public void run() { handleEnableWrite(conn); } } - private class HandshakeWorkDoneCmd implements Runnable { - private Connection conn; - HandshakeWorkDoneCmd(Connection conn) { this.conn = conn; } - public void run() { handleHandshakeWorkDone(conn); } - } - private class SyncCmd implements Runnable { boolean done = false; public synchronized void waitDone() { @@ -79,7 +73,7 @@ public class Transport { private Queue queue; private Queue myQueue; private Connector connector; - private Worker worker; + private Closer closer; private Scheduler scheduler; private int state; private Selector selector; @@ -88,7 +82,7 @@ public class Transport { private void handleAddConnection(Connection conn) { if (conn.isClosed()) { if (conn.hasSocket()) { - worker.closeLater(conn); + closer.closeLater(conn); } return; } @@ -103,7 +97,7 @@ public class Transport { } conn.fini(); if (conn.hasSocket()) { - worker.closeLater(conn); + closer.closeLater(conn); } } @@ -114,18 +108,6 @@ public class Transport { conn.enableWrite(); } - private void handleHandshakeWorkDone(Connection conn) { - if (conn.isClosed()) { - return; - } - try { - conn.handleHandshakeWorkDone(); - } catch (IOException e) { - conn.setLostReason(e); - handleCloseConnection(conn); - } - } - private boolean postCommand(Runnable cmd) { boolean wakeup; synchronized (this) { @@ -192,7 +174,7 @@ public class Transport { queue = new Queue(); myQueue = new Queue(); connector = new Connector(this); - worker = new Worker(this); + closer = new Closer(this); scheduler = new Scheduler(System.currentTimeMillis()); state = OPEN; try { @@ -305,20 +287,6 @@ public class Transport { } } - void handshakeWorkDone(Connection conn) { - postCommand(new HandshakeWorkDoneCmd(conn)); - } - - /** - * Request that {@link Connection#doHandshakeWork()} be called (in any thread) - * followed by a call to {@link Connection#handleHandshakeWorkDone()} from the transport thread. - * - * @param conn the connection needing handshake work - */ - void doHandshakeWork(Connection conn) { - worker.doHandshakeWork(conn); - } - /** * Create a {@link Task} that can be scheduled for execution in * the transport thread. @@ -411,7 +379,7 @@ public class Transport { handleCloseConnection(conn); } try { selector.close(); } catch (Exception e) {} - worker.shutdown().join(); + closer.shutdown().join(); connector.exit().join(); try { cryptoEngine.close(); } catch (Exception e) {} } diff --git a/jrt/src/com/yahoo/jrt/Worker.java b/jrt/src/com/yahoo/jrt/Worker.java deleted file mode 100644 index 41aeaa9b4ed..00000000000 --- a/jrt/src/com/yahoo/jrt/Worker.java +++ /dev/null @@ -1,85 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.jrt; - - -class Worker { - - private static int WORK_LIMIT = 1024; - - private class Run implements Runnable { - public void run() { - try { - Worker.this.run(); - } catch (Throwable problem) { - parent.handleFailure(problem, Worker.this); - } - } - } - - private static class CloseSocket implements Runnable { - Connection connection; - CloseSocket(Connection c) { - connection = c; - } - public void run() { - connection.closeSocket(); - } - } - - private static class DoHandshakeWork implements Runnable { - private Connection connection; - DoHandshakeWork(Connection c) { - connection = c; - } - public void run() { - connection.doHandshakeWork(); - connection.transport().handshakeWorkDone(connection); - } - } - - private Thread thread = new Thread(new Run(), ""); - private Transport parent; - private ThreadQueue workQueue = new ThreadQueue(); - - public Worker(Transport parent) { - this.parent = parent; - thread.setDaemon(true); - thread.start(); - } - - private void doLater(Runnable r) { - if(!workQueue.enqueue(r, WORK_LIMIT)) { - r.run(); - } - } - - public void closeLater(Connection c) { - doLater(new CloseSocket(c)); - } - - public void doHandshakeWork(Connection c) { - doLater(new DoHandshakeWork(c)); - } - - private void run() { - try { - while (true) { - ((Runnable) workQueue.dequeue()).run(); - } - } catch (EndOfQueueException e) {} - } - - public Worker shutdown() { - workQueue.close(); - return this; - } - - public void join() { - while (true) { - try { - thread.join(); - return; - } catch (InterruptedException e) {} - } - } -} diff --git a/jrt/tests/com/yahoo/jrt/LatencyTest.java b/jrt/tests/com/yahoo/jrt/LatencyTest.java deleted file mode 100644 index a1f71bda013..00000000000 --- a/jrt/tests/com/yahoo/jrt/LatencyTest.java +++ /dev/null @@ -1,84 +0,0 @@ -// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -package com.yahoo.jrt; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.logging.Logger; - -import static com.yahoo.jrt.CryptoUtils.createTestTlsContext; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -public class LatencyTest { - private static final Logger log = Logger.getLogger(LatencyTest.class.getName()); - - private static class Server implements AutoCloseable { - private Supervisor orb; - private Acceptor acceptor; - public Server(CryptoEngine crypto) throws ListenFailedException { - orb = new Supervisor(new Transport(crypto)); - acceptor = orb.listen(new Spec(0)); - orb.addMethod(new Method("inc", "i", "i", this, "rpc_inc")); - } - public Target connect() { - return orb.connect(new Spec("localhost", acceptor.port())); - } - public void rpc_inc(Request req) { - req.returnValues().add(new Int32Value(req.parameters().get(0).asInt32() + 1)); - } - public void close() { - acceptor.shutdown().join(); - orb.transport().shutdown().join(); - } - } - - private void measureLatency(String prefix, Server server, boolean reconnect) { - int value = 100; - List list = new ArrayList<>(); - Target target = server.connect(); - for (int i = 0; i < 64; ++i) { - long before = System.nanoTime(); - if (reconnect) { - target.close(); - target = server.connect(); - } - Request req = new Request("inc"); - req.parameters().add(new Int32Value(value)); - target.invokeSync(req, 60.0); - assertTrue(req.checkReturnTypes("i")); - assertEquals(value + 1, req.returnValues().get(0).asInt32()); - value++; - long duration = System.nanoTime() - before; - list.add(duration / 1000000.0); - } - target.close(); - Collections.sort(list); - log.info(prefix + "invocation latency: " + list.get(list.size() / 2) + " ms"); - } - - @org.junit.Test - public void testNullCryptoLatency() throws ListenFailedException { - try (Server server = new Server(new NullCryptoEngine())) { - measureLatency("[null crypto, no reconnect] ", server, false); - measureLatency("[null crypto, reconnect] ", server, true); - } - } - - @org.junit.Test - public void testXorCryptoLatency() throws ListenFailedException { - try (Server server = new Server(new XorCryptoEngine())) { - measureLatency("[xor crypto, no reconnect] ", server, false); - measureLatency("[xor crypto, reconnect] ", server, true); - } - } - - @org.junit.Test - public void testTlsCryptoLatency() throws ListenFailedException { - try (Server server = new Server(new TlsCryptoEngine(createTestTlsContext()))) { - measureLatency("[tls crypto, no reconnect] ", server, false); - measureLatency("[tls crypto, reconnect] ", server, true); - } - } -} -- cgit v1.2.3