summaryrefslogtreecommitdiffstats
path: root/jrt/src/com/yahoo/jrt/Transport.java
diff options
context:
space:
mode:
Diffstat (limited to 'jrt/src/com/yahoo/jrt/Transport.java')
-rw-r--r--jrt/src/com/yahoo/jrt/Transport.java367
1 files changed, 55 insertions, 312 deletions
diff --git a/jrt/src/com/yahoo/jrt/Transport.java b/jrt/src/com/yahoo/jrt/Transport.java
index 33ce6fe6ed0..f4eb1acd096 100644
--- a/jrt/src/com/yahoo/jrt/Transport.java
+++ b/jrt/src/com/yahoo/jrt/Transport.java
@@ -2,15 +2,14 @@
package com.yahoo.jrt;
-import java.io.IOException;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
import java.util.Iterator;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
-
/**
* The Transport class is the core needed to make your {@link
* Supervisor} tick. It implements the reactor pattern to perform
@@ -20,159 +19,17 @@ import java.util.logging.Logger;
**/
public class Transport {
- private static final int OPEN = 1;
- private static final int CLOSING = 2;
- private static final int CLOSED = 3;
-
- private class Run implements Runnable {
- public void run() {
- try {
- Transport.this.run();
- } catch (Throwable problem) {
- handleFailure(problem, Transport.this);
- }
- }
- }
-
- private class AddConnectionCmd implements Runnable {
- private Connection conn;
- AddConnectionCmd(Connection conn) { this.conn = conn; }
- public void run() { handleAddConnection(conn); }
- }
-
- private class CloseConnectionCmd implements Runnable {
- private Connection conn;
- CloseConnectionCmd(Connection conn) { this.conn = conn; }
- public void run() { handleCloseConnection(conn); }
- }
-
- private class EnableWriteCmd implements Runnable {
- private Connection conn;
- EnableWriteCmd(Connection conn) { this.conn = conn; }
- public void run() { handleEnableWrite(conn); }
- }
-
- private class HandshakeWorkDoneCmd implements Runnable {
- private Connection conn;
- HandshakeWorkDoneCmd(Connection conn) { this.conn = conn; }
- public void run() { handleHandshakeWorkDone(conn); }
- }
-
- private class SyncCmd implements Runnable {
- boolean done = false;
- public synchronized void waitDone() {
- while (!done) {
- try { wait(); } catch (InterruptedException e) {}
- }
- }
- public synchronized void run() {
- done = true;
- notify();
- }
- }
-
private static Logger log = Logger.getLogger(Transport.class.getName());
- private FatalErrorHandler fatalHandler; // NB: this must be set first
- private CryptoEngine cryptoEngine;
- private Thread thread;
- private Queue queue;
- private Queue myQueue;
- private Connector connector;
- private Worker worker;
- private Scheduler scheduler;
- private int state;
- private Selector selector;
- private final TransportMetrics metrics = TransportMetrics.getInstance();
+ private final FatalErrorHandler fatalHandler; // NB: this must be set first
+ private final CryptoEngine cryptoEngine;
+ private final Connector connector;
+ private final Worker worker;
+ private final AtomicInteger runCnt;
- private void handleAddConnection(Connection conn) {
- if (conn.isClosed()) {
- if (conn.hasSocket()) {
- worker.closeLater(conn);
- }
- return;
- }
- if (!conn.init(selector)) {
- handleCloseConnection(conn);
- }
- }
-
- private void handleCloseConnection(Connection conn) {
- if (conn.isClosed()) {
- return;
- }
- conn.fini();
- if (conn.hasSocket()) {
- worker.closeLater(conn);
- }
- }
-
- private void handleEnableWrite(Connection conn) {
- if (conn.isClosed()) {
- return;
- }
- conn.enableWrite();
- }
-
- private void handleHandshakeWorkDone(Connection conn) {
- if (conn.isClosed()) {
- return;
- }
- try {
- conn.handleHandshakeWorkDone();
- } catch (IOException e) {
- conn.setLostReason(e);
- handleCloseConnection(conn);
- }
- }
-
- private boolean postCommand(Runnable cmd) {
- boolean wakeup;
- synchronized (this) {
- if (state == CLOSED) {
- return false;
- }
- wakeup = queue.isEmpty();
- queue.enqueue(cmd);
- }
- if (wakeup) {
- selector.wakeup();
- }
- return true;
- }
-
- private void handleEvents() {
- synchronized (this) {
- queue.flush(myQueue);
- }
- while (!myQueue.isEmpty()) {
- ((Runnable)myQueue.dequeue()).run();
- }
- }
-
- private boolean handleIOEvents(Connection conn,
- SelectionKey key) {
- if (conn.isClosed()) {
- return true;
- }
- if (key.isReadable()) {
- try {
- conn.handleReadEvent();
- } catch (IOException e) {
- conn.setLostReason(e);
- return false;
- }
- }
- if (key.isWritable()) {
- try {
- conn.handleWriteEvent();
- } catch (IOException e) {
- conn.setLostReason(e);
- return false;
- }
- }
- return true;
- }
+ private final TransportMetrics metrics = TransportMetrics.getInstance();
+ private final ArrayList<TransportThread> threads = new ArrayList<TransportThread>();
+ private final Random rnd = new Random();
/**
* Create a new Transport object with the given fatal error
@@ -182,30 +39,33 @@ public class Transport {
*
* @param fatalHandler fatal error handler
* @param cryptoEngine crypto engine to use
+ * @param numThreads number of {@link TransportThread}s.
**/
- public Transport(FatalErrorHandler fatalHandler, CryptoEngine cryptoEngine) {
+ public Transport(FatalErrorHandler fatalHandler, CryptoEngine cryptoEngine, int numThreads) {
synchronized (this) {
this.fatalHandler = fatalHandler; // NB: this must be set first
}
this.cryptoEngine = cryptoEngine;
- thread = new Thread(new Run(), "<jrt-transport>");
- queue = new Queue();
- myQueue = new Queue();
connector = new Connector(this);
- worker = new Worker(this);
- scheduler = new Scheduler(System.currentTimeMillis());
- state = OPEN;
- try {
- selector = Selector.open();
- } catch (Exception e) {
- throw new Error("Could not open transport selector", e);
+ worker = new Worker(this);
+ runCnt = new AtomicInteger(numThreads);
+ for (int i = 0; i < numThreads; ++i) {
+ threads.add(new TransportThread(this));
}
- thread.setDaemon(true);
- thread.start();
}
- public Transport(CryptoEngine cryptoEngine) { this(null, cryptoEngine); }
- public Transport(FatalErrorHandler fatalHandler) { this(fatalHandler, CryptoEngine.createDefault()); }
- public Transport() { this(null, CryptoEngine.createDefault()); }
+ public Transport(CryptoEngine cryptoEngine, int numThreads) { this(null, cryptoEngine, numThreads); }
+ public Transport(FatalErrorHandler fatalHandler, int numThreads) { this(fatalHandler, CryptoEngine.createDefault(), numThreads); }
+ public Transport(int numThreads) { this(null, CryptoEngine.createDefault(), numThreads); }
+ public Transport() { this(null, CryptoEngine.createDefault(), 1); }
+
+ /**
+ * Select a random transport thread
+ *
+ * @return a random transport thread
+ **/
+ public TransportThread selectThread() {
+ return threads.get(rnd.nextInt(threads.size()));
+ }
/**
* Use the underlying CryptoEngine to create a CryptoSocket.
@@ -257,56 +117,15 @@ public class Transport {
* @param owner the one calling this method
* @param spec the address to connect to
* @param context application context for the new connection
- * @param sync perform a synchronous connect in the calling thread
- * if this flag is set
*/
- Connection connect(Supervisor owner, Spec spec, Object context, boolean sync) {
- Connection conn = new Connection(this, owner, spec, context);
- if (sync) {
- addConnection(conn.connect());
- } else {
- connector.connectLater(conn);
- }
+ Connection connect(Supervisor owner, Spec spec, Object context) {
+ Connection conn = new Connection(selectThread(), owner, spec, context);
+ connector.connectLater(conn);
return conn;
}
- /**
- * Add a connection to the set of connections handled by this
- * Transport. Invoked by the {@link Connector} class.
- *
- * @param conn the connection to add
- **/
- void addConnection(Connection conn) {
- if (!postCommand(new AddConnectionCmd(conn))) {
- perform(new CloseConnectionCmd(conn));
- }
- }
-
- /**
- * Request an asynchronous close of a connection.
- *
- * @param conn the connection to close
- **/
- void closeConnection(Connection conn) {
- postCommand(new CloseConnectionCmd(conn));
- }
-
- /**
- * Request an asynchronous enabling of write events for a
- * connection.
- *
- * @param conn the connection to enable write events for
- **/
- void enableWrite(Connection conn) {
- if (Thread.currentThread() == thread) {
- handleEnableWrite(conn);
- } else {
- postCommand(new EnableWriteCmd(conn));
- }
- }
-
- void handshakeWorkDone(Connection conn) {
- postCommand(new HandshakeWorkDoneCmd(conn));
+ void closeLater(Connection c) {
+ worker.closeLater(c);
}
/**
@@ -320,126 +139,50 @@ public class Transport {
}
/**
- * Create a {@link Task} that can be scheduled for execution in
- * the transport thread.
- *
- * @return the newly created Task
- * @param cmd what to run when the task is executed
- **/
- public Task createTask(Runnable cmd) {
- return new Task(scheduler, cmd);
- }
-
- /**
- * Perform the given command in such a way that it does not run
- * concurrently with the transport thread or other commands
- * performed by invoking this method. This method will continue to
- * work even after the transport thread has been shut down.
- *
- * @param cmd the command to perform
- **/
- public void perform(Runnable cmd) {
- if (Thread.currentThread() == thread) {
- cmd.run();
- return;
- }
- if (!postCommand(cmd)) {
- join();
- synchronized (thread) {
- cmd.run();
- }
- }
- }
-
- /**
- * Synchronize with the transport thread. This method will block
+ * Synchronize with all transport threads. This method will block
* until all commands issued before this method was invoked has
- * completed. If the transport thread has been shut down (or is in
+ * completed. If a transport thread has been shut down (or is in
* the progress of being shut down) this method will instead wait
* for the transport thread to complete, since no more commands
* will be performed, and waiting would be forever. Invoking this
- * method from the transport thread is not a good idea.
+ * method from a transport thread is not a good idea.
*
* @return this object, to enable chaining
**/
public Transport sync() {
- SyncCmd cmd = new SyncCmd();
- if (postCommand(cmd)) {
- cmd.waitDone();
- } else {
- join();
+ for (TransportThread thread: threads) {
+ thread.sync();
}
return this;
}
- private void run() {
- while (state == OPEN) {
-
- // perform I/O selection
- try {
- selector.select(100);
- } catch (IOException e) {
- log.log(Level.WARNING, "error during select", e);
- }
-
- // handle internal events
- handleEvents();
-
- // handle I/O events
- Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
- while (keys.hasNext()) {
- SelectionKey key = keys.next();
- Connection conn = (Connection) key.attachment();
- keys.remove();
- if (!handleIOEvents(conn, key)) {
- handleCloseConnection(conn);
- }
- }
-
- // check scheduled tasks
- scheduler.checkTasks(System.currentTimeMillis());
- }
- connector.shutdown().waitDone();
- synchronized (this) {
- state = CLOSED;
- }
- handleEvents();
- Iterator<SelectionKey> keys = selector.keys().iterator();
- while (keys.hasNext()) {
- SelectionKey key = keys.next();
- Connection conn = (Connection) key.attachment();
- handleCloseConnection(conn);
- }
- try { selector.close(); } catch (Exception e) {}
- worker.shutdown().join();
- connector.exit().join();
- try { cryptoEngine.close(); } catch (Exception e) {}
- }
-
/**
- * Initiate controlled shutdown of the transport thread.
+ * Initiate controlled shutdown of all transport threads.
*
* @return this object, to enable chaining with join
**/
public Transport shutdown() {
- synchronized (this) {
- if (state == OPEN) {
- state = CLOSING;
- selector.wakeup();
- }
+ connector.shutdown().waitDone();
+ for (TransportThread thread: threads) {
+ thread.shutdown();
}
return this;
}
/**
- * Wait for the transport thread to finish.
+ * Wait for all transport threads to finish.
**/
public void join() {
- while (true) {
- try {
- thread.join();
- return;
- } catch (InterruptedException e) {}
+ for (TransportThread thread: threads) {
+ thread.join();
+ }
+ }
+
+ void notifyDone(TransportThread self) {
+ if (runCnt.decrementAndGet() == 0) {
+ worker.shutdown().join();
+ connector.exit().join();
+ try { cryptoEngine.close(); } catch (Exception e) {}
}
}