diff options
author | Harald Musum <musum@verizonmedia.com> | 2019-02-27 19:30:52 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-02-27 19:30:52 +0100 |
commit | af26e7921e852ba6335a62bcf160339adfa195aa (patch) | |
tree | 4eb67d98b4081f90129ed10075fe465a2aac5a73 /jrt/src/com | |
parent | bfe00f042aab30166e507f8b71590be688d26443 (diff) |
Revert "async tls handshake in jrt"
Diffstat (limited to 'jrt/src/com')
-rw-r--r-- | jrt/src/com/yahoo/jrt/Closer.java | 54 | ||||
-rw-r--r-- | jrt/src/com/yahoo/jrt/Connection.java | 30 | ||||
-rw-r--r-- | jrt/src/com/yahoo/jrt/ThreadQueue.java | 18 | ||||
-rw-r--r-- | jrt/src/com/yahoo/jrt/Transport.java | 42 | ||||
-rw-r--r-- | jrt/src/com/yahoo/jrt/Worker.java | 85 |
5 files changed, 65 insertions, 164 deletions
diff --git a/jrt/src/com/yahoo/jrt/Closer.java b/jrt/src/com/yahoo/jrt/Closer.java new file mode 100644 index 00000000000..71d99807253 --- /dev/null +++ b/jrt/src/com/yahoo/jrt/Closer.java @@ -0,0 +1,54 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +class Closer { + + private class Run implements Runnable { + public void run() { + try { + Closer.this.run(); + } catch (Throwable problem) { + parent.handleFailure(problem, Closer.this); + } + } + } + + private Thread thread = new Thread(new Run(), "<jrt-closer>"); + private Transport parent; + private ThreadQueue closeQueue = new ThreadQueue(); + + public Closer(Transport parent) { + this.parent = parent; + thread.setDaemon(true); + thread.start(); + } + + public void closeLater(Connection c) { + if (!closeQueue.enqueue(c)) { + c.closeSocket(); + } + } + + private void run() { + try { + while (true) { + ((Connection)closeQueue.dequeue()).closeSocket(); + } + } catch (EndOfQueueException e) {} + } + + public Closer shutdown() { + closeQueue.close(); + return this; + } + + public void join() { + while (true) { + try { + thread.join(); + return; + } catch (InterruptedException e) {} + } + } +} diff --git a/jrt/src/com/yahoo/jrt/Connection.java b/jrt/src/com/yahoo/jrt/Connection.java index 36e1f1c8a8e..df7acf881df 100644 --- a/jrt/src/com/yahoo/jrt/Connection.java +++ b/jrt/src/com/yahoo/jrt/Connection.java @@ -41,7 +41,6 @@ class Connection extends Target { private Map<TargetWatcher, TargetWatcher> watchers = new IdentityHashMap<>(); private int activeReqs = 0; private int writeWork = 0; - private boolean pendingHandshakeWork = false; private Transport parent; private Supervisor owner; private Spec spec; @@ -218,10 +217,11 @@ class Connection extends Target { } private void handshake() throws IOException { - if (pendingHandshakeWork) { - return; + HandshakeResult result; + while ((result = socket.handshake()) == HandshakeResult.NEED_WORK) { + socket.doHandshakeWork(); } - switch (socket.handshake()) { + switch (result) { case DONE: if (socket.getMinimumReadBufferSize() > readSize) { readSize = socket.getMinimumReadBufferSize(); @@ -239,28 +239,6 @@ class Connection extends Target { disableRead(); enableWrite(); break; - case NEED_WORK: - disableRead(); - disableWrite(); - pendingHandshakeWork = true; - parent.doHandshakeWork(this); - break; - } - } - - public void doHandshakeWork() { - socket.doHandshakeWork(); - } - - public void handleHandshakeWorkDone() throws IOException { - if (!pendingHandshakeWork) { - throw new IllegalStateException("jrt: got unwanted handshake work done event"); - } - pendingHandshakeWork = false; - if (state == CONNECTING) { - handshake(); - } else { - throw new IOException("jrt: got handshake work done event in incompatible state: " + state); } } diff --git a/jrt/src/com/yahoo/jrt/ThreadQueue.java b/jrt/src/com/yahoo/jrt/ThreadQueue.java index b758a30214a..119008d835d 100644 --- a/jrt/src/com/yahoo/jrt/ThreadQueue.java +++ b/jrt/src/com/yahoo/jrt/ThreadQueue.java @@ -21,22 +21,8 @@ class ThreadQueue * was closed * @param obj the object to enqueue **/ - public boolean enqueue(Object obj) { - return enqueue(obj, Integer.MAX_VALUE); - } - - /** - * Enqueue an object on this queue. If the queue has been closed or - * the queue already contains too many items, the object will not be - * queued, and this method will return false. - * - * @return true if the object was enqueued, false if this queue - * was closed or too large - * @param obj the object to enqueue - * @param limit more elements than this means the queue is too large - **/ - public synchronized boolean enqueue(Object obj, int limit) { - if (closed || (queue.size() > limit)) { + public synchronized boolean enqueue(Object obj) { + if (closed) { return false; } queue.enqueue(obj); diff --git a/jrt/src/com/yahoo/jrt/Transport.java b/jrt/src/com/yahoo/jrt/Transport.java index 33ce6fe6ed0..0a2f2a4b7cb 100644 --- a/jrt/src/com/yahoo/jrt/Transport.java +++ b/jrt/src/com/yahoo/jrt/Transport.java @@ -52,12 +52,6 @@ public class Transport { public void run() { handleEnableWrite(conn); } } - private class HandshakeWorkDoneCmd implements Runnable { - private Connection conn; - HandshakeWorkDoneCmd(Connection conn) { this.conn = conn; } - public void run() { handleHandshakeWorkDone(conn); } - } - private class SyncCmd implements Runnable { boolean done = false; public synchronized void waitDone() { @@ -79,7 +73,7 @@ public class Transport { private Queue queue; private Queue myQueue; private Connector connector; - private Worker worker; + private Closer closer; private Scheduler scheduler; private int state; private Selector selector; @@ -88,7 +82,7 @@ public class Transport { private void handleAddConnection(Connection conn) { if (conn.isClosed()) { if (conn.hasSocket()) { - worker.closeLater(conn); + closer.closeLater(conn); } return; } @@ -103,7 +97,7 @@ public class Transport { } conn.fini(); if (conn.hasSocket()) { - worker.closeLater(conn); + closer.closeLater(conn); } } @@ -114,18 +108,6 @@ public class Transport { conn.enableWrite(); } - private void handleHandshakeWorkDone(Connection conn) { - if (conn.isClosed()) { - return; - } - try { - conn.handleHandshakeWorkDone(); - } catch (IOException e) { - conn.setLostReason(e); - handleCloseConnection(conn); - } - } - private boolean postCommand(Runnable cmd) { boolean wakeup; synchronized (this) { @@ -192,7 +174,7 @@ public class Transport { queue = new Queue(); myQueue = new Queue(); connector = new Connector(this); - worker = new Worker(this); + closer = new Closer(this); scheduler = new Scheduler(System.currentTimeMillis()); state = OPEN; try { @@ -305,20 +287,6 @@ public class Transport { } } - void handshakeWorkDone(Connection conn) { - postCommand(new HandshakeWorkDoneCmd(conn)); - } - - /** - * Request that {@link Connection#doHandshakeWork()} be called (in any thread) - * followed by a call to {@link Connection#handleHandshakeWorkDone()} from the transport thread. - * - * @param conn the connection needing handshake work - */ - void doHandshakeWork(Connection conn) { - worker.doHandshakeWork(conn); - } - /** * Create a {@link Task} that can be scheduled for execution in * the transport thread. @@ -411,7 +379,7 @@ public class Transport { handleCloseConnection(conn); } try { selector.close(); } catch (Exception e) {} - worker.shutdown().join(); + closer.shutdown().join(); connector.exit().join(); try { cryptoEngine.close(); } catch (Exception e) {} } diff --git a/jrt/src/com/yahoo/jrt/Worker.java b/jrt/src/com/yahoo/jrt/Worker.java deleted file mode 100644 index 41aeaa9b4ed..00000000000 --- a/jrt/src/com/yahoo/jrt/Worker.java +++ /dev/null @@ -1,85 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.jrt; - - -class Worker { - - private static int WORK_LIMIT = 1024; - - private class Run implements Runnable { - public void run() { - try { - Worker.this.run(); - } catch (Throwable problem) { - parent.handleFailure(problem, Worker.this); - } - } - } - - private static class CloseSocket implements Runnable { - Connection connection; - CloseSocket(Connection c) { - connection = c; - } - public void run() { - connection.closeSocket(); - } - } - - private static class DoHandshakeWork implements Runnable { - private Connection connection; - DoHandshakeWork(Connection c) { - connection = c; - } - public void run() { - connection.doHandshakeWork(); - connection.transport().handshakeWorkDone(connection); - } - } - - private Thread thread = new Thread(new Run(), "<jrt-worker>"); - private Transport parent; - private ThreadQueue workQueue = new ThreadQueue(); - - public Worker(Transport parent) { - this.parent = parent; - thread.setDaemon(true); - thread.start(); - } - - private void doLater(Runnable r) { - if(!workQueue.enqueue(r, WORK_LIMIT)) { - r.run(); - } - } - - public void closeLater(Connection c) { - doLater(new CloseSocket(c)); - } - - public void doHandshakeWork(Connection c) { - doLater(new DoHandshakeWork(c)); - } - - private void run() { - try { - while (true) { - ((Runnable) workQueue.dequeue()).run(); - } - } catch (EndOfQueueException e) {} - } - - public Worker shutdown() { - workQueue.close(); - return this; - } - - public void join() { - while (true) { - try { - thread.join(); - return; - } catch (InterruptedException e) {} - } - } -} |