summaryrefslogtreecommitdiffstats
path: root/jrt
diff options
context:
space:
mode:
authorHarald Musum <musum@verizonmedia.com>2019-02-27 19:30:52 +0100
committerGitHub <noreply@github.com>2019-02-27 19:30:52 +0100
commitaf26e7921e852ba6335a62bcf160339adfa195aa (patch)
tree4eb67d98b4081f90129ed10075fe465a2aac5a73 /jrt
parentbfe00f042aab30166e507f8b71590be688d26443 (diff)
Revert "async tls handshake in jrt"
Diffstat (limited to 'jrt')
-rw-r--r--jrt/src/com/yahoo/jrt/Closer.java54
-rw-r--r--jrt/src/com/yahoo/jrt/Connection.java30
-rw-r--r--jrt/src/com/yahoo/jrt/ThreadQueue.java18
-rw-r--r--jrt/src/com/yahoo/jrt/Transport.java42
-rw-r--r--jrt/src/com/yahoo/jrt/Worker.java85
-rw-r--r--jrt/tests/com/yahoo/jrt/LatencyTest.java84
6 files changed, 65 insertions, 248 deletions
diff --git a/jrt/src/com/yahoo/jrt/Closer.java b/jrt/src/com/yahoo/jrt/Closer.java
new file mode 100644
index 00000000000..71d99807253
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/Closer.java
@@ -0,0 +1,54 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+class Closer {
+
+ private class Run implements Runnable {
+ public void run() {
+ try {
+ Closer.this.run();
+ } catch (Throwable problem) {
+ parent.handleFailure(problem, Closer.this);
+ }
+ }
+ }
+
+ private Thread thread = new Thread(new Run(), "<jrt-closer>");
+ private Transport parent;
+ private ThreadQueue closeQueue = new ThreadQueue();
+
+ public Closer(Transport parent) {
+ this.parent = parent;
+ thread.setDaemon(true);
+ thread.start();
+ }
+
+ public void closeLater(Connection c) {
+ if (!closeQueue.enqueue(c)) {
+ c.closeSocket();
+ }
+ }
+
+ private void run() {
+ try {
+ while (true) {
+ ((Connection)closeQueue.dequeue()).closeSocket();
+ }
+ } catch (EndOfQueueException e) {}
+ }
+
+ public Closer shutdown() {
+ closeQueue.close();
+ return this;
+ }
+
+ public void join() {
+ while (true) {
+ try {
+ thread.join();
+ return;
+ } catch (InterruptedException e) {}
+ }
+ }
+}
diff --git a/jrt/src/com/yahoo/jrt/Connection.java b/jrt/src/com/yahoo/jrt/Connection.java
index 36e1f1c8a8e..df7acf881df 100644
--- a/jrt/src/com/yahoo/jrt/Connection.java
+++ b/jrt/src/com/yahoo/jrt/Connection.java
@@ -41,7 +41,6 @@ class Connection extends Target {
private Map<TargetWatcher, TargetWatcher> watchers = new IdentityHashMap<>();
private int activeReqs = 0;
private int writeWork = 0;
- private boolean pendingHandshakeWork = false;
private Transport parent;
private Supervisor owner;
private Spec spec;
@@ -218,10 +217,11 @@ class Connection extends Target {
}
private void handshake() throws IOException {
- if (pendingHandshakeWork) {
- return;
+ HandshakeResult result;
+ while ((result = socket.handshake()) == HandshakeResult.NEED_WORK) {
+ socket.doHandshakeWork();
}
- switch (socket.handshake()) {
+ switch (result) {
case DONE:
if (socket.getMinimumReadBufferSize() > readSize) {
readSize = socket.getMinimumReadBufferSize();
@@ -239,28 +239,6 @@ class Connection extends Target {
disableRead();
enableWrite();
break;
- case NEED_WORK:
- disableRead();
- disableWrite();
- pendingHandshakeWork = true;
- parent.doHandshakeWork(this);
- break;
- }
- }
-
- public void doHandshakeWork() {
- socket.doHandshakeWork();
- }
-
- public void handleHandshakeWorkDone() throws IOException {
- if (!pendingHandshakeWork) {
- throw new IllegalStateException("jrt: got unwanted handshake work done event");
- }
- pendingHandshakeWork = false;
- if (state == CONNECTING) {
- handshake();
- } else {
- throw new IOException("jrt: got handshake work done event in incompatible state: " + state);
}
}
diff --git a/jrt/src/com/yahoo/jrt/ThreadQueue.java b/jrt/src/com/yahoo/jrt/ThreadQueue.java
index b758a30214a..119008d835d 100644
--- a/jrt/src/com/yahoo/jrt/ThreadQueue.java
+++ b/jrt/src/com/yahoo/jrt/ThreadQueue.java
@@ -21,22 +21,8 @@ class ThreadQueue
* was closed
* @param obj the object to enqueue
**/
- public boolean enqueue(Object obj) {
- return enqueue(obj, Integer.MAX_VALUE);
- }
-
- /**
- * Enqueue an object on this queue. If the queue has been closed or
- * the queue already contains too many items, the object will not be
- * queued, and this method will return false.
- *
- * @return true if the object was enqueued, false if this queue
- * was closed or too large
- * @param obj the object to enqueue
- * @param limit more elements than this means the queue is too large
- **/
- public synchronized boolean enqueue(Object obj, int limit) {
- if (closed || (queue.size() > limit)) {
+ public synchronized boolean enqueue(Object obj) {
+ if (closed) {
return false;
}
queue.enqueue(obj);
diff --git a/jrt/src/com/yahoo/jrt/Transport.java b/jrt/src/com/yahoo/jrt/Transport.java
index 33ce6fe6ed0..0a2f2a4b7cb 100644
--- a/jrt/src/com/yahoo/jrt/Transport.java
+++ b/jrt/src/com/yahoo/jrt/Transport.java
@@ -52,12 +52,6 @@ public class Transport {
public void run() { handleEnableWrite(conn); }
}
- private class HandshakeWorkDoneCmd implements Runnable {
- private Connection conn;
- HandshakeWorkDoneCmd(Connection conn) { this.conn = conn; }
- public void run() { handleHandshakeWorkDone(conn); }
- }
-
private class SyncCmd implements Runnable {
boolean done = false;
public synchronized void waitDone() {
@@ -79,7 +73,7 @@ public class Transport {
private Queue queue;
private Queue myQueue;
private Connector connector;
- private Worker worker;
+ private Closer closer;
private Scheduler scheduler;
private int state;
private Selector selector;
@@ -88,7 +82,7 @@ public class Transport {
private void handleAddConnection(Connection conn) {
if (conn.isClosed()) {
if (conn.hasSocket()) {
- worker.closeLater(conn);
+ closer.closeLater(conn);
}
return;
}
@@ -103,7 +97,7 @@ public class Transport {
}
conn.fini();
if (conn.hasSocket()) {
- worker.closeLater(conn);
+ closer.closeLater(conn);
}
}
@@ -114,18 +108,6 @@ public class Transport {
conn.enableWrite();
}
- private void handleHandshakeWorkDone(Connection conn) {
- if (conn.isClosed()) {
- return;
- }
- try {
- conn.handleHandshakeWorkDone();
- } catch (IOException e) {
- conn.setLostReason(e);
- handleCloseConnection(conn);
- }
- }
-
private boolean postCommand(Runnable cmd) {
boolean wakeup;
synchronized (this) {
@@ -192,7 +174,7 @@ public class Transport {
queue = new Queue();
myQueue = new Queue();
connector = new Connector(this);
- worker = new Worker(this);
+ closer = new Closer(this);
scheduler = new Scheduler(System.currentTimeMillis());
state = OPEN;
try {
@@ -305,20 +287,6 @@ public class Transport {
}
}
- void handshakeWorkDone(Connection conn) {
- postCommand(new HandshakeWorkDoneCmd(conn));
- }
-
- /**
- * Request that {@link Connection#doHandshakeWork()} be called (in any thread)
- * followed by a call to {@link Connection#handleHandshakeWorkDone()} from the transport thread.
- *
- * @param conn the connection needing handshake work
- */
- void doHandshakeWork(Connection conn) {
- worker.doHandshakeWork(conn);
- }
-
/**
* Create a {@link Task} that can be scheduled for execution in
* the transport thread.
@@ -411,7 +379,7 @@ public class Transport {
handleCloseConnection(conn);
}
try { selector.close(); } catch (Exception e) {}
- worker.shutdown().join();
+ closer.shutdown().join();
connector.exit().join();
try { cryptoEngine.close(); } catch (Exception e) {}
}
diff --git a/jrt/src/com/yahoo/jrt/Worker.java b/jrt/src/com/yahoo/jrt/Worker.java
deleted file mode 100644
index 41aeaa9b4ed..00000000000
--- a/jrt/src/com/yahoo/jrt/Worker.java
+++ /dev/null
@@ -1,85 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.jrt;
-
-
-class Worker {
-
- private static int WORK_LIMIT = 1024;
-
- private class Run implements Runnable {
- public void run() {
- try {
- Worker.this.run();
- } catch (Throwable problem) {
- parent.handleFailure(problem, Worker.this);
- }
- }
- }
-
- private static class CloseSocket implements Runnable {
- Connection connection;
- CloseSocket(Connection c) {
- connection = c;
- }
- public void run() {
- connection.closeSocket();
- }
- }
-
- private static class DoHandshakeWork implements Runnable {
- private Connection connection;
- DoHandshakeWork(Connection c) {
- connection = c;
- }
- public void run() {
- connection.doHandshakeWork();
- connection.transport().handshakeWorkDone(connection);
- }
- }
-
- private Thread thread = new Thread(new Run(), "<jrt-worker>");
- private Transport parent;
- private ThreadQueue workQueue = new ThreadQueue();
-
- public Worker(Transport parent) {
- this.parent = parent;
- thread.setDaemon(true);
- thread.start();
- }
-
- private void doLater(Runnable r) {
- if(!workQueue.enqueue(r, WORK_LIMIT)) {
- r.run();
- }
- }
-
- public void closeLater(Connection c) {
- doLater(new CloseSocket(c));
- }
-
- public void doHandshakeWork(Connection c) {
- doLater(new DoHandshakeWork(c));
- }
-
- private void run() {
- try {
- while (true) {
- ((Runnable) workQueue.dequeue()).run();
- }
- } catch (EndOfQueueException e) {}
- }
-
- public Worker shutdown() {
- workQueue.close();
- return this;
- }
-
- public void join() {
- while (true) {
- try {
- thread.join();
- return;
- } catch (InterruptedException e) {}
- }
- }
-}
diff --git a/jrt/tests/com/yahoo/jrt/LatencyTest.java b/jrt/tests/com/yahoo/jrt/LatencyTest.java
deleted file mode 100644
index a1f71bda013..00000000000
--- a/jrt/tests/com/yahoo/jrt/LatencyTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-package com.yahoo.jrt;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.logging.Logger;
-
-import static com.yahoo.jrt.CryptoUtils.createTestTlsContext;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class LatencyTest {
- private static final Logger log = Logger.getLogger(LatencyTest.class.getName());
-
- private static class Server implements AutoCloseable {
- private Supervisor orb;
- private Acceptor acceptor;
- public Server(CryptoEngine crypto) throws ListenFailedException {
- orb = new Supervisor(new Transport(crypto));
- acceptor = orb.listen(new Spec(0));
- orb.addMethod(new Method("inc", "i", "i", this, "rpc_inc"));
- }
- public Target connect() {
- return orb.connect(new Spec("localhost", acceptor.port()));
- }
- public void rpc_inc(Request req) {
- req.returnValues().add(new Int32Value(req.parameters().get(0).asInt32() + 1));
- }
- public void close() {
- acceptor.shutdown().join();
- orb.transport().shutdown().join();
- }
- }
-
- private void measureLatency(String prefix, Server server, boolean reconnect) {
- int value = 100;
- List<Double> list = new ArrayList<>();
- Target target = server.connect();
- for (int i = 0; i < 64; ++i) {
- long before = System.nanoTime();
- if (reconnect) {
- target.close();
- target = server.connect();
- }
- Request req = new Request("inc");
- req.parameters().add(new Int32Value(value));
- target.invokeSync(req, 60.0);
- assertTrue(req.checkReturnTypes("i"));
- assertEquals(value + 1, req.returnValues().get(0).asInt32());
- value++;
- long duration = System.nanoTime() - before;
- list.add(duration / 1000000.0);
- }
- target.close();
- Collections.sort(list);
- log.info(prefix + "invocation latency: " + list.get(list.size() / 2) + " ms");
- }
-
- @org.junit.Test
- public void testNullCryptoLatency() throws ListenFailedException {
- try (Server server = new Server(new NullCryptoEngine())) {
- measureLatency("[null crypto, no reconnect] ", server, false);
- measureLatency("[null crypto, reconnect] ", server, true);
- }
- }
-
- @org.junit.Test
- public void testXorCryptoLatency() throws ListenFailedException {
- try (Server server = new Server(new XorCryptoEngine())) {
- measureLatency("[xor crypto, no reconnect] ", server, false);
- measureLatency("[xor crypto, reconnect] ", server, true);
- }
- }
-
- @org.junit.Test
- public void testTlsCryptoLatency() throws ListenFailedException {
- try (Server server = new Server(new TlsCryptoEngine(createTestTlsContext()))) {
- measureLatency("[tls crypto, no reconnect] ", server, false);
- measureLatency("[tls crypto, reconnect] ", server, true);
- }
- }
-}