diff options
Diffstat (limited to 'jrt/src/com/yahoo/jrt/Transport.java')
-rw-r--r-- | jrt/src/com/yahoo/jrt/Transport.java | 367 |
1 files changed, 55 insertions, 312 deletions
diff --git a/jrt/src/com/yahoo/jrt/Transport.java b/jrt/src/com/yahoo/jrt/Transport.java index 33ce6fe6ed0..f4eb1acd096 100644 --- a/jrt/src/com/yahoo/jrt/Transport.java +++ b/jrt/src/com/yahoo/jrt/Transport.java @@ -2,15 +2,14 @@ package com.yahoo.jrt; -import java.io.IOException; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; import java.nio.channels.SocketChannel; +import java.util.ArrayList; import java.util.Iterator; +import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; import java.util.logging.Logger; - /** * The Transport class is the core needed to make your {@link * Supervisor} tick. It implements the reactor pattern to perform @@ -20,159 +19,17 @@ import java.util.logging.Logger; **/ public class Transport { - private static final int OPEN = 1; - private static final int CLOSING = 2; - private static final int CLOSED = 3; - - private class Run implements Runnable { - public void run() { - try { - Transport.this.run(); - } catch (Throwable problem) { - handleFailure(problem, Transport.this); - } - } - } - - private class AddConnectionCmd implements Runnable { - private Connection conn; - AddConnectionCmd(Connection conn) { this.conn = conn; } - public void run() { handleAddConnection(conn); } - } - - private class CloseConnectionCmd implements Runnable { - private Connection conn; - CloseConnectionCmd(Connection conn) { this.conn = conn; } - public void run() { handleCloseConnection(conn); } - } - - private class EnableWriteCmd implements Runnable { - private Connection conn; - EnableWriteCmd(Connection conn) { this.conn = conn; } - public void run() { handleEnableWrite(conn); } - } - - private class HandshakeWorkDoneCmd implements Runnable { - private Connection conn; - HandshakeWorkDoneCmd(Connection conn) { this.conn = conn; } - public void run() { handleHandshakeWorkDone(conn); } - } - - private class SyncCmd implements Runnable { - boolean done = false; - public synchronized void waitDone() { - while (!done) { - try { wait(); } catch (InterruptedException e) {} - } - } - public synchronized void run() { - done = true; - notify(); - } - } - private static Logger log = Logger.getLogger(Transport.class.getName()); - private FatalErrorHandler fatalHandler; // NB: this must be set first - private CryptoEngine cryptoEngine; - private Thread thread; - private Queue queue; - private Queue myQueue; - private Connector connector; - private Worker worker; - private Scheduler scheduler; - private int state; - private Selector selector; - private final TransportMetrics metrics = TransportMetrics.getInstance(); + private final FatalErrorHandler fatalHandler; // NB: this must be set first + private final CryptoEngine cryptoEngine; + private final Connector connector; + private final Worker worker; + private final AtomicInteger runCnt; - private void handleAddConnection(Connection conn) { - if (conn.isClosed()) { - if (conn.hasSocket()) { - worker.closeLater(conn); - } - return; - } - if (!conn.init(selector)) { - handleCloseConnection(conn); - } - } - - private void handleCloseConnection(Connection conn) { - if (conn.isClosed()) { - return; - } - conn.fini(); - if (conn.hasSocket()) { - worker.closeLater(conn); - } - } - - private void handleEnableWrite(Connection conn) { - if (conn.isClosed()) { - return; - } - conn.enableWrite(); - } - - private void handleHandshakeWorkDone(Connection conn) { - if (conn.isClosed()) { - return; - } - try { - conn.handleHandshakeWorkDone(); - } catch (IOException e) { - conn.setLostReason(e); - handleCloseConnection(conn); - } - } - - private boolean postCommand(Runnable cmd) { - boolean wakeup; - synchronized (this) { - if (state == CLOSED) { - return false; - } - wakeup = queue.isEmpty(); - queue.enqueue(cmd); - } - if (wakeup) { - selector.wakeup(); - } - return true; - } - - private void handleEvents() { - synchronized (this) { - queue.flush(myQueue); - } - while (!myQueue.isEmpty()) { - ((Runnable)myQueue.dequeue()).run(); - } - } - - private boolean handleIOEvents(Connection conn, - SelectionKey key) { - if (conn.isClosed()) { - return true; - } - if (key.isReadable()) { - try { - conn.handleReadEvent(); - } catch (IOException e) { - conn.setLostReason(e); - return false; - } - } - if (key.isWritable()) { - try { - conn.handleWriteEvent(); - } catch (IOException e) { - conn.setLostReason(e); - return false; - } - } - return true; - } + private final TransportMetrics metrics = TransportMetrics.getInstance(); + private final ArrayList<TransportThread> threads = new ArrayList<TransportThread>(); + private final Random rnd = new Random(); /** * Create a new Transport object with the given fatal error @@ -182,30 +39,33 @@ public class Transport { * * @param fatalHandler fatal error handler * @param cryptoEngine crypto engine to use + * @param numThreads number of {@link TransportThread}s. **/ - public Transport(FatalErrorHandler fatalHandler, CryptoEngine cryptoEngine) { + public Transport(FatalErrorHandler fatalHandler, CryptoEngine cryptoEngine, int numThreads) { synchronized (this) { this.fatalHandler = fatalHandler; // NB: this must be set first } this.cryptoEngine = cryptoEngine; - thread = new Thread(new Run(), "<jrt-transport>"); - queue = new Queue(); - myQueue = new Queue(); connector = new Connector(this); - worker = new Worker(this); - scheduler = new Scheduler(System.currentTimeMillis()); - state = OPEN; - try { - selector = Selector.open(); - } catch (Exception e) { - throw new Error("Could not open transport selector", e); + worker = new Worker(this); + runCnt = new AtomicInteger(numThreads); + for (int i = 0; i < numThreads; ++i) { + threads.add(new TransportThread(this)); } - thread.setDaemon(true); - thread.start(); } - public Transport(CryptoEngine cryptoEngine) { this(null, cryptoEngine); } - public Transport(FatalErrorHandler fatalHandler) { this(fatalHandler, CryptoEngine.createDefault()); } - public Transport() { this(null, CryptoEngine.createDefault()); } + public Transport(CryptoEngine cryptoEngine, int numThreads) { this(null, cryptoEngine, numThreads); } + public Transport(FatalErrorHandler fatalHandler, int numThreads) { this(fatalHandler, CryptoEngine.createDefault(), numThreads); } + public Transport(int numThreads) { this(null, CryptoEngine.createDefault(), numThreads); } + public Transport() { this(null, CryptoEngine.createDefault(), 1); } + + /** + * Select a random transport thread + * + * @return a random transport thread + **/ + public TransportThread selectThread() { + return threads.get(rnd.nextInt(threads.size())); + } /** * Use the underlying CryptoEngine to create a CryptoSocket. @@ -257,56 +117,15 @@ public class Transport { * @param owner the one calling this method * @param spec the address to connect to * @param context application context for the new connection - * @param sync perform a synchronous connect in the calling thread - * if this flag is set */ - Connection connect(Supervisor owner, Spec spec, Object context, boolean sync) { - Connection conn = new Connection(this, owner, spec, context); - if (sync) { - addConnection(conn.connect()); - } else { - connector.connectLater(conn); - } + Connection connect(Supervisor owner, Spec spec, Object context) { + Connection conn = new Connection(selectThread(), owner, spec, context); + connector.connectLater(conn); return conn; } - /** - * Add a connection to the set of connections handled by this - * Transport. Invoked by the {@link Connector} class. - * - * @param conn the connection to add - **/ - void addConnection(Connection conn) { - if (!postCommand(new AddConnectionCmd(conn))) { - perform(new CloseConnectionCmd(conn)); - } - } - - /** - * Request an asynchronous close of a connection. - * - * @param conn the connection to close - **/ - void closeConnection(Connection conn) { - postCommand(new CloseConnectionCmd(conn)); - } - - /** - * Request an asynchronous enabling of write events for a - * connection. - * - * @param conn the connection to enable write events for - **/ - void enableWrite(Connection conn) { - if (Thread.currentThread() == thread) { - handleEnableWrite(conn); - } else { - postCommand(new EnableWriteCmd(conn)); - } - } - - void handshakeWorkDone(Connection conn) { - postCommand(new HandshakeWorkDoneCmd(conn)); + void closeLater(Connection c) { + worker.closeLater(c); } /** @@ -320,126 +139,50 @@ public class Transport { } /** - * Create a {@link Task} that can be scheduled for execution in - * the transport thread. - * - * @return the newly created Task - * @param cmd what to run when the task is executed - **/ - public Task createTask(Runnable cmd) { - return new Task(scheduler, cmd); - } - - /** - * Perform the given command in such a way that it does not run - * concurrently with the transport thread or other commands - * performed by invoking this method. This method will continue to - * work even after the transport thread has been shut down. - * - * @param cmd the command to perform - **/ - public void perform(Runnable cmd) { - if (Thread.currentThread() == thread) { - cmd.run(); - return; - } - if (!postCommand(cmd)) { - join(); - synchronized (thread) { - cmd.run(); - } - } - } - - /** - * Synchronize with the transport thread. This method will block + * Synchronize with all transport threads. This method will block * until all commands issued before this method was invoked has - * completed. If the transport thread has been shut down (or is in + * completed. If a transport thread has been shut down (or is in * the progress of being shut down) this method will instead wait * for the transport thread to complete, since no more commands * will be performed, and waiting would be forever. Invoking this - * method from the transport thread is not a good idea. + * method from a transport thread is not a good idea. * * @return this object, to enable chaining **/ public Transport sync() { - SyncCmd cmd = new SyncCmd(); - if (postCommand(cmd)) { - cmd.waitDone(); - } else { - join(); + for (TransportThread thread: threads) { + thread.sync(); } return this; } - private void run() { - while (state == OPEN) { - - // perform I/O selection - try { - selector.select(100); - } catch (IOException e) { - log.log(Level.WARNING, "error during select", e); - } - - // handle internal events - handleEvents(); - - // handle I/O events - Iterator<SelectionKey> keys = selector.selectedKeys().iterator(); - while (keys.hasNext()) { - SelectionKey key = keys.next(); - Connection conn = (Connection) key.attachment(); - keys.remove(); - if (!handleIOEvents(conn, key)) { - handleCloseConnection(conn); - } - } - - // check scheduled tasks - scheduler.checkTasks(System.currentTimeMillis()); - } - connector.shutdown().waitDone(); - synchronized (this) { - state = CLOSED; - } - handleEvents(); - Iterator<SelectionKey> keys = selector.keys().iterator(); - while (keys.hasNext()) { - SelectionKey key = keys.next(); - Connection conn = (Connection) key.attachment(); - handleCloseConnection(conn); - } - try { selector.close(); } catch (Exception e) {} - worker.shutdown().join(); - connector.exit().join(); - try { cryptoEngine.close(); } catch (Exception e) {} - } - /** - * Initiate controlled shutdown of the transport thread. + * Initiate controlled shutdown of all transport threads. * * @return this object, to enable chaining with join **/ public Transport shutdown() { - synchronized (this) { - if (state == OPEN) { - state = CLOSING; - selector.wakeup(); - } + connector.shutdown().waitDone(); + for (TransportThread thread: threads) { + thread.shutdown(); } return this; } /** - * Wait for the transport thread to finish. + * Wait for all transport threads to finish. **/ public void join() { - while (true) { - try { - thread.join(); - return; - } catch (InterruptedException e) {} + for (TransportThread thread: threads) { + thread.join(); + } + } + + void notifyDone(TransportThread self) { + if (runCnt.decrementAndGet() == 0) { + worker.shutdown().join(); + connector.exit().join(); + try { cryptoEngine.close(); } catch (Exception e) {} } } |