// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.io;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.Map;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.ArrayList;
import java.util.logging.Logger;
import java.util.logging.Level;
import java.nio.channels.SocketChannel;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.Selector;
import java.nio.channels.SelectionKey;
/**
* A basic Reactor implementation using NIO.
*
* @author Bob Travis
* @author Bjorn Borud
*
*/
public class Listener extends Thread {
private static Logger log = Logger.getLogger(Listener.class.getName());
private Selector selector;
Map acceptors = new HashMap<>();
Map factories = new IdentityHashMap<>();
private FatalErrorHandler fatalErrorHandler;
private List selectLoopPreHooks;
private List selectLoopPostHooks;
final private LinkedList newConnections = new LinkedList<>();
// queue of SelectionKeys that need to be updated
final private LinkedList modifyInterestOpsQueue = new LinkedList<>();
public Listener(String name) {
super("Listener-" + name);
try {
selector = Selector.open();
} catch (IOException e) {
throw new RuntimeException(e);
}
log.fine(name + " listener created " + this);
}
/**
* Register a handler for fatal errors.
*
* @param f The FatalErrorHandler instance to be registered
*/
public synchronized void setFatalErrorHandler(FatalErrorHandler f) {
fatalErrorHandler = f;
}
/**
* Add pre-select loop hook. Not threadsafe so please do this
* during initial setup before you start the listener.
*/
public void addSelectLoopPreHook(SelectLoopHook hook) {
if (selectLoopPreHooks == null) {
selectLoopPreHooks = new ArrayList<>(5);
}
selectLoopPreHooks.add(hook);
}
/**
* Add pre-select loop hook. Not threadsafe so please do this
* during initial setup before you start the listener.
*/
public void addSelectLoopPostHook(SelectLoopHook hook) {
if (selectLoopPostHooks == null) {
selectLoopPostHooks = new ArrayList<>(5);
}
selectLoopPostHooks.add(hook);
}
/**
* Run all the select loop pre hooks
*/
private void runSelectLoopPreHooks() {
if (selectLoopPreHooks == null) {
return;
}
for (SelectLoopHook hook : selectLoopPreHooks) {
hook.selectLoopHook(true);
}
}
/**
* Run all the select loop post hooks
*/
private void runSelectLoopPostHooks() {
if (selectLoopPostHooks == null) {
return;
}
for (SelectLoopHook hook : selectLoopPostHooks) {
hook.selectLoopHook(false);
}
}
/**
* Add a listening port and create an Acceptor thread which accepts
* new connections on this port.
*
* @param factory The connection factory for new connections
* on this port
* @param port The port we are going to listen to.
*/
public synchronized void listen(ConnectionFactory factory, int port)
throws IOException {
// make sure we have only one acceptor per listen port
if (acceptors.containsKey(port)) {
log.warning("Already listening to port=" + port);
return;
}
Acceptor a = new Acceptor(this, factory, port);
// inherit the fatal error handling of listener
if (fatalErrorHandler != null) {
a.setFatalErrorHandler(fatalErrorHandler);
}
a.listen().start();
acceptors.put(port, a);
}
/**
* Add a listening port without creating a separate acceptor
* thread.
*
* @param factory The connection factory for new connections
* on this port
* @param port The port we are going to listen to.
*/
public synchronized void listenNoAcceptor(ConnectionFactory factory, int port)
throws IOException {
ServerSocketChannel s = ServerSocketChannel.open();
s.configureBlocking(false);
s.socket().setReuseAddress(true);
s.socket().bind(new InetSocketAddress(port)); // use non-specific IP
String host = s.socket().getInetAddress().getHostName();
factories.put(s, factory);
s.register(selector, SelectionKey.OP_ACCEPT);
log.fine("listener " + host + ":" + port);
}
// ==================================================================
// ==================================================================
// ==================================================================
/**
* This is the preferred way of modifying interest ops, giving a
* Connection rather than a SelectionKey as input. This way the
* we can look it up and ensure the correct SelectionKey is always
* used.
*
* @return Returns a this
reference for chaining
*/
public Listener modifyInterestOps(Connection connection,
int op, boolean set) {
return modifyInterestOps(connection.socketChannel().keyFor(selector), op,
set);
}
/**
* Batch version of modifyInterestOps().
*
* @return Returns a this
reference for chaining
*/
public Listener modifyInterestOpsBatch(Connection connection,
int op, boolean set) {
return modifyInterestOpsBatch(
connection.socketChannel().keyFor(selector), op, set);
}
/**
* Enqueue change to interest set of SelectionKey. This is a workaround
* for an NIO design error that makes it impossible to update interest
* sets for a SelectionKey while a select is in progress -- and sometimes
* you actually want to do this from other threads, which will then
* block. Hence, we make it possible to enqueue requests for
* SelectionKey modification in the thread where select runs.
*
* @return Returns a this
reference for chaining
*/
public Listener modifyInterestOps(SelectionKey key, int op, boolean set) {
synchronized (modifyInterestOpsQueue) {
modifyInterestOpsQueue.addLast(new UpdateInterest(key, op, set));
}
selector.wakeup();
return this;
}
/**
* Does the same as modifyInterestOps(), but does not call
* wakeup on the selector. Allows adding more modifications
* before we wake up the selector.
*
* @return Returns a this
reference for chaining
*/
public Listener modifyInterestOpsBatch(SelectionKey key,
int op,
boolean set) {
synchronized (modifyInterestOpsQueue) {
modifyInterestOpsQueue.addLast(new UpdateInterest(key, op, set));
}
return this;
}
/**
* Signal that a batch update of SelectionKey is done and the
* selector should be awoken. Also see modifyInterestOps().
*
* @return Returns a this
reference for chaining
*/
public Listener modifyInterestOpsDone() {
selector.wakeup();
return this;
}
/**
* Process enqueued changes to SelectionKeys. Also see
* modifyInterestOps().
*/
private void processModifyInterestOps() {
synchronized (modifyInterestOpsQueue) {
while (!modifyInterestOpsQueue.isEmpty()) {
UpdateInterest u = modifyInterestOpsQueue.removeFirst();
u.doUpdate();
}
}
}
// ==================================================================
// ==================================================================
// ==================================================================
/**
* Thread entry point
*/
public void run() {
log.fine("Started listener");
try {
selectLoop();
} catch (Throwable t) {
if (fatalErrorHandler != null) {
fatalErrorHandler.handle(t, null);
}
}
}
/**
* Check channels for readiness and deal with channels that have
* pending operations.
*/
private void selectLoop() {
while (!Thread.currentThread().isInterrupted()) {
processNewConnections();
processModifyInterestOps();
try {
int n = selector.select();
if (0 == n) {
continue;
}
} catch (java.io.IOException e) {
log.log(Level.WARNING, "error during select", e);
return;
}
runSelectLoopPreHooks();
Iterator i = selector.selectedKeys().iterator();
while (i.hasNext()) {
SelectionKey key = i.next();
i.remove();
if (!key.isValid()) {
continue;
}
if (key.isReadable()) {
performRead(key);
if (!key.isValid()) {
continue;
}
}
if (key.isWritable()) {
performWrite(key);
if (!key.isValid()) {
continue;
}
}
if (key.isConnectable()) {
performConnect(key);
if (!key.isValid()) {
continue;
}
}
if (key.isAcceptable()) {
performAccept(key);
}
}
runSelectLoopPostHooks();
}
}
/**
* This method is used by the Acceptor to hand off newly accepted
* connections to the Listener. Note that this is run in the
* context of the Acceptor thread, so doing things here versus
* doing them in the acceptNewConnections(), which runs in the context
* of the Listener thread, is a tradeoff that may need to be
* re-evaluated
*
*/
public Connection addNewConnection(Connection newConn) {
// ensure nonblocking and handle possible errors
// if setting nonblocking fails. this code is really redundant
// but necessary because the older version of this method set
// the connection nonblocking, and clients might still expect
// this behavior.
//
SocketChannel channel = newConn.socketChannel();
if (channel.isBlocking()) {
try {
channel.configureBlocking(false);
} catch (java.nio.channels.IllegalBlockingModeException e) {
log.log(Level.SEVERE, "Unable to set nonblocking", e);
try {
channel.close();
} catch (java.io.IOException ee) {
log.log(Level.WARNING, "channel close failed", ee);
}
return newConn;
} catch (java.io.IOException e) {
log.log(Level.SEVERE, "Unable to set nonblocking", e);
return newConn;
}
}
synchronized (newConnections) {
newConnections.addLast(newConn);
}
selector.wakeup();
return newConn;
}
/**
* This method is called from the selectLoop() method in order to
* process new incoming connections.
*/
private synchronized void processNewConnections() {
synchronized (newConnections) {
while (!newConnections.isEmpty()) {
Connection conn = newConnections.removeFirst();
try {
conn.socketChannel().register(selector, conn.selectOps(),
conn);
} catch (ClosedChannelException e) {
log.log(Level.WARNING, "register channel failed", e);
return;
}
}
}
}
/**
* Accept new connection. This will loop over accept() until
* there are no more new connections to accept. If any error
* occurs after a successful accept, the socket in question will
* be discarded, but we will continue to try to accept new
* connections if available.
*
*/
private void performAccept(SelectionKey key) {
SocketChannel channel;
ServerSocketChannel ssChannel;
if (Thread.currentThread().isInterrupted()) {
return;
}
while (true) {
try {
ssChannel = (ServerSocketChannel) key.channel();
channel = ssChannel.accept();
// if for some reason there was no connection we just
// ignore it.
if (null == channel) {
return;
}
} catch (java.io.IOException e) {
log.log(Level.WARNING, "accept failed", e);
return;
}
// set nonblocking and handle possible errors
try {
channel.configureBlocking(false);
} catch (java.nio.channels.IllegalBlockingModeException e) {
log.log(Level.SEVERE, "Unable to set nonblocking", e);
try {
channel.close();
} catch (java.io.IOException ee) {
log.log(Level.WARNING, "channel close failed", ee);
continue;
}
continue;
} catch (java.io.IOException e) {
log.log(Level.WARNING, "IO error occurred", e);
try {
channel.close();
} catch (java.io.IOException ee) {
log.log(Level.WARNING, "channel close failed", ee);
continue;
}
continue;
}
ConnectionFactory factory = factories.get(ssChannel);
Connection conn = factory.newConnection(channel, this);
try {
channel.register(selector, conn.selectOps(), conn);
} catch (java.nio.channels.ClosedChannelException e) {
log.log(Level.WARNING, "register channel failed", e);
}
}
}
/**
* Complete asynchronous connect operation. Note that
* asynchronous connect does not work properly in 1.4,
* so you should not use this if you run anything older
* than 1.5/5.0.
*
*/
private void performConnect(SelectionKey key) {
if (Thread.currentThread().isInterrupted()) {
return;
}
Connection c = (Connection) key.attachment();
try {
c.connect();
} catch (IOException e) {
log.log(Level.FINE, "connect failed", e);
try {
c.close();
} catch (IOException e2) {
log.log(Level.FINE, "close failed", e);
}
}
}
/**
* Perform read operation on channel which is now ready for reading
*/
private void performRead(SelectionKey key) {
if (Thread.currentThread().isInterrupted()) {
return;
}
Connection c = (Connection) key.attachment();
try {
c.read();
} catch (IOException e) {
log.log(Level.FINE, "read failed", e);
try {
c.close();
} catch (IOException e2) {
log.log(Level.FINE, "close failed", e);
}
}
}
/**
* Perform write operation(s) on channel which is now ready for
* writing
*/
private void performWrite(SelectionKey key) {
if (Thread.currentThread().isInterrupted()) {
return;
}
Connection c = (Connection) key.attachment();
try {
c.write();
} catch (IOException e) {
log.log(Level.FINE, " write failed", e);
try {
c.close();
} catch (IOException e2) {// ignore
}
}
}
// ============================================================
// ==== connections made outside listener
// ============================================================
/**
* Register a connection that was set up outside the listener.
* Typically what we do when we actively reach out and connect
* somewhere.
*/
public void registerConnection(Connection connection) {
synchronized (newConnections) {
newConnections.addLast(connection);
}
selector.wakeup();
}
/**
* Perform clean shutdown of Listener.
*
* TODO: implement
*/
public void shutdown() {// make writing impossible
// make listening on new ports impossible
// close all listening connections (kill all listener threads)
// flush outbound data if the connection wants it
// close all connections
// have some sort of grace-period before forcibly shutting down
}
}