diff options
Diffstat (limited to 'container-search/src/main/java/com/yahoo/fs4/mplex')
6 files changed, 1236 insertions, 0 deletions
diff --git a/container-search/src/main/java/com/yahoo/fs4/mplex/Backend.java b/container-search/src/main/java/com/yahoo/fs4/mplex/Backend.java new file mode 100644 index 00000000000..12f8e9e387d --- /dev/null +++ b/container-search/src/main/java/com/yahoo/fs4/mplex/Backend.java @@ -0,0 +1,449 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.fs4.mplex; + +import com.yahoo.fs4.BasicPacket; +import com.yahoo.fs4.Packet; +import com.yahoo.fs4.PacketDumper; +import com.yahoo.fs4.PacketListener; +import com.yahoo.fs4.PacketNotificationsBroadcaster; +import com.yahoo.fs4.PacketQueryTracer; +import com.yahoo.io.Connection; +import com.yahoo.io.ConnectionFactory; +import com.yahoo.io.Listener; +import com.yahoo.vespa.defaults.Defaults; +import com.yahoo.yolean.Exceptions; +import com.yahoo.yolean.concurrent.ConcurrentResourcePool; +import com.yahoo.yolean.concurrent.ResourceFactory; +import com.yahoo.yolean.concurrent.ResourcePool; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * @author Bjorn Borud + */ +public class Backend implements ConnectionFactory { + + private static int DEFAULT_BUFFER_SIZE = 0x8000; + + public static final class BackendStatistics { + + public final int activeConnections; + public final int passiveConnections; + + public BackendStatistics(int activeConnections, int passiveConnections) { + this.activeConnections = activeConnections; + this.passiveConnections = passiveConnections; + } + + @Override + public String toString() { + return activeConnections + "/" + totalConnections(); + } + + public int totalConnections() { + return activeConnections + passiveConnections; + } + } + + private static final Logger log = Logger.getLogger(Backend.class.getName()); + + private final ListenerPool listeners; + private final InetSocketAddress address; + private final String host; + private final int port; + private final Map<Integer, FS4Channel> activeChannels = new HashMap<>(); + private int channelId = 0; + private boolean shutdownInitiated = false; + + /** Whether we are currently in the state of not being able to connect, to avoid repeated logging */ + private boolean areInSocketNotConnectableState = false; + + private final LinkedList<FS4Channel> pingChannels = new LinkedList<>(); + private final PacketListener packetListener; + private final ConnectionPool connectionPool; + private final PacketDumper packetDumper; + private final AtomicInteger connectionCount = new AtomicInteger(0); + private final ConcurrentResourcePool<ByteBuffer> byteBufferResourcePool = new ConcurrentResourcePool<>(new ResourceFactory<>() { + @Override + public ByteBuffer create() { + return ByteBuffer.allocate(DEFAULT_BUFFER_SIZE); + } + }); + + /** + * For unit testing. do not use + */ + protected Backend() { + listeners = null; + host = null; + port = 0; + packetListener = null; + packetDumper = null; + address = null; + connectionPool = new ConnectionPool(); + } + + public Backend(String host, + int port, + String serverDiscriminator, + ListenerPool listenerPool, + ConnectionPool connectionPool) { + String fileNamePattern = "qrs." + serverDiscriminator + '.' + host + ":" + port + ".%s" + ".dump"; + packetDumper = new PacketDumper(new File(Defaults.getDefaults().underVespaHome("logs/vespa/qrs/")), + fileNamePattern); + packetListener = new PacketNotificationsBroadcaster(packetDumper, new PacketQueryTracer()); + this.listeners = listenerPool; + this.host = host; + this.port = port; + address = new InetSocketAddress(host, port); + this.connectionPool = connectionPool; + } + + private void logWarning(String attemptDescription, Exception e) { + log.log(Level.WARNING, "Exception on " + attemptDescription + " '" + host + ":" + port + "': " + Exceptions.toMessageString(e)); + } + + private void logInfo(String attemptDescription, Exception e) { + log.log(Level.INFO, "Exception on " + attemptDescription + " '" + host + ":" + port + "': " + Exceptions.toMessageString(e)); + } + + // ============================================================ + // ==== connection pool stuff + // ============================================================ + + /** + * Fetch a connection from the connection pool. If the pool + * is empty we create a connection. + */ + private FS4Connection getConnection() throws IOException { + FS4Connection connection = connectionPool.getConnection(); + if (connection == null) { + // if pool was empty create one: + connection = createConnection(); + } + return connection; + } + + ConcurrentResourcePool<ByteBuffer> getBufferPool() { + return byteBufferResourcePool; + } + + /** + * Return a connection to the connection pool. If the + * connection is not valid anymore we drop it, ie. do not + * put it into the pool. + */ + public void returnConnection(FS4Connection connection) { + connectionPool.releaseConnection(connection); + } + + /** + * Create a new connection to the target for this backend. + */ + private FS4Connection createConnection() throws IOException { + SocketChannel socket = SocketChannel.open(); + try { + connectSocket(socket); + } catch (Exception e) { + // was warning, see VESPA-1922 + if ( ! areInSocketNotConnectableState) { + logInfo("connecting to", e); + } + areInSocketNotConnectableState = true; + socket.close(); + return null; + } + areInSocketNotConnectableState = false; + int listenerId = connectionCount.getAndIncrement()%listeners.size(); + Listener listener = listeners.get(listenerId); + FS4Connection connection = new FS4Connection(socket, listener, this, packetListener); + listener.registerConnection(connection); + + log.fine("Created new connection to " + host + ":" + port); + connectionPool.createdConnection(); + return connection; + } + + private void connectSocket(SocketChannel socket) throws IOException { + socket.configureBlocking(false); + + boolean connected = socket.connect(address); + + // wait for connection + if (!connected) { + long timeBarrier = System.currentTimeMillis() + 20L; + while (true) { + try { + Thread.sleep(5L); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException("Received InterruptedException while waiting for socket to connect.", e); + } + // don't care whether it's spurious wakeup + connected = socket.finishConnect(); + if (connected || System.currentTimeMillis() > timeBarrier) { + break; + } + } + } + + // did we get a connection? + if ( !connected) { + throw new IllegalArgumentException("Could not create connection to dispatcher on " + + address.getHostName() + ":" + address.getPort()); + } + socket.socket(). + setTcpNoDelay(true); + } + + + //============================================================ + //==== channel management + //============================================================ + + /** Opens a new channel to fdispatch. Analogous to the "Channel" concept as used in FS4. */ + public FS4Channel openChannel() { + int cachedChannelId; + synchronized (this) { + if (channelId >= ((1 << 31) - 2)) { + channelId = 0; + } + cachedChannelId = channelId; + channelId += 2; + } + Integer id = cachedChannelId; + FS4Channel chan = new FS4Channel(this, id); + synchronized (activeChannels) { + activeChannels.put(id, chan); + } + return chan; + } + + public FS4Channel openPingChannel() { + FS4Channel chan = FS4Channel.createPingChannel(this); + synchronized (pingChannels) { + pingChannels.add(chan); + } + return chan; + } + + /** + * Get the remote address for this Backend. This method + * has package access only, because it is really only of + * importance to FS4Channel for writing slightly more sensible + * log messages. + * @return Returns the address (host, port) for this Backend. + */ + InetSocketAddress getAddress() { + return address; + } + + /** + * Get an active channel by id. + * + * @param id the (fs4) channel id + * @return returns the (fs4) channel associated with this id + * or <code>null</code> if the channel is not in the + * set of active channels. + */ + public FS4Channel getChannel(Integer id) { + synchronized (activeChannels) { + return activeChannels.get(id); + } + } + + /** + * Return the first channel in the queue waiting for pings or + * <code>null</code> if none. + */ + public FS4Channel getPingChannel() { + synchronized (pingChannels) { + return (pingChannels.isEmpty()) ? null : pingChannels.getFirst(); + } + } + + /** + * Get an active channel by id. This is a wrapper for the method + * that takes the id as an Integer. + * + * @param id The (fs4) channel id + * @return Returns the (fs4) channel associated with this id + * or <code>null</code> if the channel is not in the + * set of active channels. + */ + public FS4Channel getChannel(int id) { + return getChannel(Integer.valueOf(id)); + } + + /** + * Remove a channel. We do not want this method to be called + * directly by the client -- removal of channels should be done + * by calling the close() method of the channel. + * + * @param id The (fs4) channel id + * @return Removes and returns the (fs4) channel associated + * with this id or <code>null</code> if the channel is + * not in the set of active channels. + */ + protected FS4Channel removeChannel(Integer id) { + synchronized (activeChannels) { + return activeChannels.remove(id); + } + } + + /** + * Remove a ping channel. We do not want this method to be called + * directly by the client -- removal of channels should be done + * by calling the close() method of the channel. + * + * @return Removes and returns the (fs4) channel first in + * the queue of ping channels or <code>null</code> + * if there are no active ping channels. + */ + protected FS4Channel removePingChannel() { + synchronized (pingChannels) { + if (pingChannels.isEmpty()) + return null; + return pingChannels.removeFirst(); + } + } + //============================================================ + //==== packet sending and reception + //============================================================ + + protected boolean sendPacket(BasicPacket packet, Integer channelId) throws IOException { + if (shutdownInitiated) { + log.fine("Tried to send packet after shutdown initiated. Ignored."); + return false; + } + + FS4Connection connection = null; + try { + connection = getConnection(); + if (connection == null) { + return false; + } + connection.sendPacket(packet, channelId); + } + finally { + if (connection != null) { + returnConnection(connection); + } + } + + return true; + } + + /** + * When a connection receives a packet, it uses this method to + * dispatch the packet to the right FS4Channel. If the corresponding + * FS4Channel does not exist the packet is dropped and a message is + * logged saying so. + */ + protected void receivePacket(BasicPacket packet) { + FS4Channel fs4; + if (packet.hasChannelId()) + fs4 = getChannel(((Packet)packet).getChannel()); + else + fs4 = getPingChannel(); + + // channel does not exist + if (fs4 == null) { + return; + } + try { + fs4.addPacket(packet); + } + catch (InterruptedException e) { + log.info("Interrupted during packet adding. Packet = " + packet.toString()); + Thread.currentThread().interrupt(); + } + catch (InvalidChannelException e) { + log.log(Level.WARNING, "Channel was invalid. Packet = " + packet.toString() + + " Backend probably sent data pertaining an old request," + + " system may be overloaded."); + } + } + + /** + * Attempt to establish a connection without sending messages and then + * return it to the pool. The assumption is that if the probing is + * successful, the connection will be used soon after. There should be + * minimal overhead since the connection is cached. + */ + public boolean probeConnection() { + if (shutdownInitiated) { + return false; + } + + FS4Connection connection = null; + try { + connection = getConnection(); + } catch (IOException ignored) { + // connection is null + } finally { + if (connection != null) { + returnConnection(connection); + } + } + + return connection != null; + } + + /** + * This method should be used to ensure graceful shutdown of the backend. + */ + public void shutdown() { + log.fine("shutting down"); + if (shutdownInitiated) { + throw new IllegalStateException("Shutdown already in progress"); + } + shutdownInitiated = true; + } + + public void close() { + for (Connection c = connectionPool.getConnection(); c != null; c = connectionPool.getConnection()) { + try { + c.close(); + } catch (IOException e) { + logWarning("closing", e); + } + } + } + + /** + * Connection factory used by the Listener class. + */ + public Connection newConnection(SocketChannel channel, Listener listener) { + return new FS4Connection(channel, listener, this, packetListener); + } + + public String toString () { + return("Backend/" + host + ":" + port); + } + + public BackendStatistics getStatistics() { + synchronized (connectionPool) { //ensure consistent values + return new BackendStatistics(connectionPool.activeConnections(), connectionPool.passiveConnections()); + } + } + + public String getHost() { + return host; + } + + public int getPort() { + return port; + } + +} diff --git a/container-search/src/main/java/com/yahoo/fs4/mplex/ConnectionPool.java b/container-search/src/main/java/com/yahoo/fs4/mplex/ConnectionPool.java new file mode 100644 index 00000000000..e84adfbef2c --- /dev/null +++ b/container-search/src/main/java/com/yahoo/fs4/mplex/ConnectionPool.java @@ -0,0 +1,90 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.fs4.mplex; + +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Logger; +import java.util.Iterator; +import java.util.Timer; +import java.util.TimerTask; + +import com.yahoo.log.LogLevel; +/** + * Pool of FS4 connections. + * + * @author Tony Vaagenes + */ +public class ConnectionPool { + + private final static int CLEANINGPERIOD = 1000; // Execute every second + private final Queue<FS4Connection> connections = new ConcurrentLinkedQueue<>(); + private final AtomicInteger activeConnections = new AtomicInteger(0); + private final AtomicInteger passiveConnections = new AtomicInteger(0); + private static final Logger log = Logger.getLogger(ConnectionPool.class.getName()); + + class PoolCleanerTask extends TimerTask { + private final ConnectionPool connectionPool; + public PoolCleanerTask(ConnectionPool connectionPool) { + this.connectionPool = connectionPool; + } + + public void run() { + try { + connectionPool.dropInvalidConnections(); + } catch (Exception e) { + log.log(LogLevel.WARNING, + "Caught exception in connection pool cleaner, ignoring.", + e); + } + } + } + + public ConnectionPool() { + } + + public ConnectionPool(Timer timer) { + timer.schedule(new PoolCleanerTask(this), CLEANINGPERIOD, CLEANINGPERIOD); + } + + private void dropInvalidConnections() { + for (Iterator<FS4Connection> i = connections.iterator(); i.hasNext();) { + FS4Connection connection = i.next(); + if (!connection.isValid()) { + i.remove(); + } + } + } + + private FS4Connection registerAsActiveIfNonZero(FS4Connection connection) { + activeConnections.incrementAndGet(); + passiveConnections.decrementAndGet(); + return connection; + } + + public FS4Connection getConnection() { + return registerAsActiveIfNonZero(connections.poll()); + } + + void releaseConnection(FS4Connection connection) { + assert(connection != null); + activeConnections.decrementAndGet(); + if (connection.isValid()) { + passiveConnections.incrementAndGet(); + connections.add(connection); + } + } + + void createdConnection() { + activeConnections.incrementAndGet(); + } + + int activeConnections() { + return activeConnections.get(); + } + + //unused connections in the pool + int passiveConnections() { + return passiveConnections.get(); + } +} diff --git a/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Channel.java b/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Channel.java new file mode 100644 index 00000000000..adfc63d02f7 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Channel.java @@ -0,0 +1,255 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.fs4.mplex; + +import com.yahoo.concurrent.SystemTimer; +import com.yahoo.fs4.BasicPacket; +import com.yahoo.fs4.ChannelTimeoutException; +import com.yahoo.fs4.Packet; +import com.yahoo.search.Query; +import com.yahoo.search.dispatch.ResponseMonitor; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; + +/** + * This class is used to represent a "channel" in the FS4 protocol. + * A channel represents a session between a client and the fdispatch. + * Internally this class has a response queue used by the backend + * for queueing up FS4 packets that belong to this channel (or + * <em>session</em>, which might be a more appropriate name for it). + * Outbound packets are handed off to the FS4Connection. + * + * @author Bjorn Borud + */ +public class FS4Channel { + + private static Logger log = Logger.getLogger(FS4Channel.class.getName()); + + private Integer channelId; + private Backend backend; + volatile private BlockingQueue<BasicPacket> responseQueue; + private Query query; + private boolean isPingChannel = false; + private ResponseMonitor<FS4Channel> monitor; + + /** for unit testing. do not use */ + protected FS4Channel () { + } + + protected FS4Channel(Backend backend, Integer channelId) { + this.channelId = channelId; + this.backend = backend; + this.responseQueue = new LinkedBlockingQueue<>(); + } + + static public FS4Channel createPingChannel(Backend backend) { + FS4Channel pingChannel = new FS4Channel(backend, Integer.valueOf(0)); + pingChannel.isPingChannel = true; + return pingChannel; + } + + /** Set the query currently associated with this channel */ + public void setQuery(Query query) { + this.query = query; + } + + /** Get the query currently associated with this channel */ + public Query getQuery() { + return query; + } + + /** Returns the (fs4) channel id */ + public Integer getChannelId () { + return channelId; + } + + /** + * Closes the channel + */ + public void close () { + BlockingQueue<BasicPacket> q = responseQueue; + responseQueue = null; + query = null; + if (isPingChannel) { + backend.removePingChannel(); + } else { + backend.removeChannel(channelId); + } + if (q != null) { + q.clear(); + } + } + + /** + * Legacy interface. + */ + public boolean sendPacket(BasicPacket packet) throws InvalidChannelException, IOException { + ensureValid(); + return backend.sendPacket(packet, channelId); + } + + /** + * Receives the given number of packets and returns them, OR + * <ul> + * <li>Returns a smaller number of packets if an error or eol packet is received + * <li>Throws a ChannelTimeoutException if timeout occurs before all packets + * are received. Packets received with the wrong channel id are ignored. + * </ul> + * + * @param timeout the number of ms to attempt to get packets before throwing an exception + * @param packetCount the number of packets to receive, or -1 to receive any number up to eol/error + */ + public BasicPacket[] receivePackets(long timeout, int packetCount) + throws InvalidChannelException, ChannelTimeoutException { + ensureValid(); + + List<BasicPacket> packets = new ArrayList<>(12); + long startTime = SystemTimer.INSTANCE.milliTime(); + long timeLeft = timeout; + + try { + while (timeLeft >= 0) { + BasicPacket p = nextPacket(timeLeft); + if (p == null) throw new ChannelTimeoutException("Timed out"); + + if (!isPingChannel && ((Packet)p).getChannel() != getChannelId().intValue()) { + log.warning("Ignoring received " + p + ", when excepting channel " + getChannelId()); + continue; + } + + packets.add(p); + if (isLastPacket(p) || hasEnoughPackets(packetCount, packets)) { + BasicPacket[] packetArray = new BasicPacket[packets.size()]; + packets.toArray(packetArray); + return packetArray; + } + + // doing this last might save us one system call for the last + // packet. + timeLeft = timeout - (SystemTimer.INSTANCE.milliTime() - startTime); + } + } + catch (InvalidChannelException e) { + // nop. if we get this we want to return the default + // zero length packet array indicating that we have no + // valid response + log.info("FS4Channel was invalid. timeLeft=" + + timeLeft + ", timeout=" + timeout); + } + catch (InterruptedException e) { + log.info("FS4Channel was interrupted. timeLeft=" + + timeLeft + ", timeout=" + timeout); + Thread.currentThread().interrupt(); + } + + // default return, we only hit this if we timed out and + // did not get the end of the packet stream + throw new ChannelTimeoutException(); + } + + private static boolean hasEnoughPackets(int packetCount,List<BasicPacket> packets) { + if (packetCount<0) return false; + return packets.size()>=packetCount; + } + + /** + * Returns true if we will definitely receive more packets on this stream + * + * Shouldn't that be "_not_ receive more packets"? + */ + private static boolean isLastPacket (BasicPacket packet) { + if (packet instanceof com.yahoo.fs4.ErrorPacket) return true; + if (packet instanceof com.yahoo.fs4.EolPacket) return true; + if (packet instanceof com.yahoo.fs4.PongPacket) return true; + return false; + } + + /** + * Return the next available packet from the response queue. If there + * are no packets available we wait a maximum of <code>timeout</code> + * milliseconds before returning a <code>null</code> + * + * @param timeout Number of milliseconds to wait for a packet + * to become available. + * + * @return Returns the next available <code>BasicPacket</code> or + * <code>null</code> if we timed out. + */ + public BasicPacket nextPacket(long timeout) + throws InterruptedException, InvalidChannelException + { + return ensureValidQ().poll(timeout, TimeUnit.MILLISECONDS); + } + + /** + * Add incoming packet to the response queue. This is to be used + * by the listener for placing incoming packets in the response + * queue. + * + * @param packet BasicPacket to be placed in the response queue. + * + */ + protected void addPacket (BasicPacket packet) + throws InterruptedException, InvalidChannelException + { + ensureValidQ().put(packet); + notifyMonitor(); + } + + /** + * A valid FS4Channel is one that has not yet been closed. + * + * @return Returns <code>true</code> if the FS4Channel is valid. + */ + public boolean isValid () { + return responseQueue != null; + } + + /** + * This method is called whenever we want to perform an operation + * which assumes that the FS4Channel object is valid. An exception + * is thrown if the opposite turns out to be the case. + * + * @throws InvalidChannelException if the channel is no longer valid. + */ + private void ensureValid () throws InvalidChannelException { + if (isValid()) { + return; + } + throw new InvalidChannelException("Channel is no longer valid"); + } + + /** + * This method is called whenever we want to perform an operation + * which assumes that the FS4Channel object is valid. An exception + * is thrown if the opposite turns out to be the case. + * + * @throws InvalidChannelException if the channel is no longer valid. + */ + private BlockingQueue<BasicPacket> ensureValidQ () throws InvalidChannelException { + BlockingQueue<BasicPacket> q = responseQueue; + if (q != null) { + return q; + } + throw new InvalidChannelException("Channel is no longer valid"); + } + + public String toString() { + return "fs4 channel " + channelId + (isValid() ? " [valid]" : " [invalid]"); + } + + public void setResponseMonitor(ResponseMonitor<FS4Channel> monitor) { + this.monitor = monitor; + } + + protected void notifyMonitor() { + if(monitor != null) { + monitor.responseAvailable(this); + } + } +} diff --git a/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Connection.java b/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Connection.java new file mode 100644 index 00000000000..7dcbefde9fa --- /dev/null +++ b/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Connection.java @@ -0,0 +1,371 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +package com.yahoo.fs4.mplex; + + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; +import java.util.LinkedList; +import java.util.logging.Level; +import java.util.logging.Logger; + +import com.yahoo.fs4.BasicPacket; +import com.yahoo.fs4.BufferTooSmallException; +import com.yahoo.fs4.PacketDecoder; +import com.yahoo.fs4.PacketListener; +import com.yahoo.io.Connection; +import com.yahoo.io.Listener; +import com.yahoo.log.LogLevel; + +/** + * + * This class is used to represent a connection to an fdispatch + * + * @author <a href="mailto:borud@yahoo-inc.com">Bjorn Borud</a> + */ +public class FS4Connection implements Connection +{ + private static Logger log = Logger.getLogger(FS4Connection.class.getName()); + private Backend backend; + private Listener listener; + private SocketChannel channel; + + private boolean shouldWrite = false; + + private static int idCounter = 1; + private int idNumber; + + // outbound data + private ByteBuffer writeBuffer; + private LinkedList<ByteBuffer> writeBufferList = new LinkedList<>(); + + // inbound data + private ByteBuffer fixedReadBuffer = ByteBuffer.allocateDirect(256 * 1024); + private ByteBuffer readBuffer = fixedReadBuffer; + + private volatile boolean valid = true; + + private final PacketListener packetListener; + + + /** + * Create an FS4 Connection. + */ + public FS4Connection (SocketChannel channel, Listener listener, Backend backend, PacketListener packetListener) { + this.backend = backend; + this.listener = listener; + this.channel = channel; + this.idNumber = idCounter++; + this.packetListener = packetListener; + + log.log(Level.FINER, "new: "+this+", id="+idNumber + ", address=" + backend.getAddress()); + } + + + /** + * Packet sending interface. + */ + public void sendPacket (BasicPacket packet, Integer channelId) throws IOException { + ByteBuffer buffer = packet.grantEncodingBuffer(channelId.intValue(), backend.getBufferPool().alloc()); + ByteBuffer viewForPacketListener = buffer.slice(); + synchronized (this) { + if (!(valid && channel.isOpen())) { + throw new IllegalStateException("Connection is not valid. " + + "Address = " + backend.getAddress() + + ", valid = " + valid + + ", isOpen = " + channel.isOpen()); + } + + if (writeBuffer == null) { + writeBuffer = buffer; + } else { + writeBufferList.addLast(buffer); + enableWrite(); + } + write(); + } + + if (packetListener != null) + packetListener.packetSent(backend.getChannel(channelId), packet, viewForPacketListener); + } + + + /** + * The write event handler. This can be called both from the client + * thread and from the IO thread, so it needs to be synchronized. It + * assumes that IO is nonblocking, and will attempt to keep writing + * data until the system won't accept more data. + * + */ + public synchronized void write () throws IOException { + if (! channel.isOpen()) { + throw new IllegalStateException("Channel not open in write(), address=" + backend.getAddress()); + } + + try { + int bytesWritten = 0; + boolean isFinished = false; + do { + // if writeBuffer is not set we need to fetch the next buffer + if (writeBuffer == null) { + + // if the list is empty, signal the selector we do not need + // to do any writing for a while yet and bail + if (writeBufferList.isEmpty()) { + disableWrite(); + isFinished = true; + break; + } + writeBuffer = writeBufferList.removeFirst(); + } + + // invariants: we have a writeBuffer + bytesWritten = channel.write(writeBuffer); + + // buffer drained so we forget it and see what happens when we + // go around. if indeed we go around + if (!writeBuffer.hasRemaining()) { + writeBuffer.clear(); + backend.getBufferPool().free(writeBuffer); + writeBuffer = null; + } + } while (bytesWritten > 0); + if (!isFinished) { + enableWrite(); + } + } catch (IOException e) { + log.log(LogLevel.DEBUG, "Failed writing to channel for backend " + backend.getAddress() + + ". Closing channel", e); + try { + close(); + } catch (IOException ignored) {} + + throw e; + } + } + + + private void disableWrite() { + if (shouldWrite) { + listener.modifyInterestOpsBatch(this, SelectionKey.OP_WRITE, false); + shouldWrite = false; + } + } + + + private void enableWrite() { + if (!shouldWrite) { + listener.modifyInterestOps(this, SelectionKey.OP_WRITE, true); + shouldWrite = true; + } + } + + + + public void read () throws IOException { + if (! channel.isOpen()) { + throw new IOException("Channel not open in read(), address=" + backend.getAddress()); + } + + int bytesRead = 0; + + do { + try { + if (readBuffer == fixedReadBuffer) { + bytesRead = channel.read(readBuffer); + } else { + fixedReadBuffer.clear(); + if (readBuffer.remaining() < fixedReadBuffer.capacity()) { + fixedReadBuffer.limit(readBuffer.remaining()); + } + bytesRead = channel.read(fixedReadBuffer); + fixedReadBuffer.flip(); + readBuffer.put(fixedReadBuffer); + fixedReadBuffer.clear(); + } + } + catch (IOException e) { + // this is the "normal" way that connection closes. + log.log(Level.FINER, "Read exception address=" + backend.getAddress() + " id="+idNumber+": "+ + e.getClass().getName()+" / ", e); + bytesRead = -1; + } + + // end of file + if (bytesRead == -1) { + log.log(LogLevel.DEBUG, "Dispatch closed connection" + + " (id="+idNumber+", address=" + backend.getAddress() + ")"); + try { + close(); + } catch (Exception e) { + log.log(Level.WARNING, "Close failed, address=" + backend.getAddress(), e); + } + } + + // no more read + if (bytesRead == 0) { + // buffer too small? + if (! readBuffer.hasRemaining()) { + log.fine("Buffer possibly too small, extending"); + readBuffer.flip(); + extendReadBuffer(readBuffer.capacity() * 2); + } + } + + } while (bytesRead > 0); + + readBuffer.flip(); + + // hand off packet extraction + extractPackets(readBuffer); + } + + private void extractPackets(ByteBuffer readBuffer) { + for (;;) { + PacketDecoder.DecodedPacket packet = null; + try { + FS4Channel receiver = null; + int queryId = PacketDecoder.sniffChannel(readBuffer); + if (queryId == 0) { + if (PacketDecoder.isPongPacket(readBuffer)) + receiver = backend.getPingChannel(); + } + else { + receiver = backend.getChannel(Integer.valueOf(queryId)); + } + packet = PacketDecoder.extractPacket(readBuffer); + + if (packet != null) + packetListener.packetReceived(receiver, packet.packet, packet.consumedBytes); + } + catch (BufferTooSmallException e) { + log.fine("Unable to decode, extending readBuffer"); + extendReadBuffer(PacketDecoder.packetLength(readBuffer)); + return; + } + + // break out of loop if we did not get a packet out of the + // buffer so we can select and read some more + if (packet == null) { + + // if the buffer has been cleared, we can do a reset + // of the readBuffer + if ((readBuffer.position() == 0) + && (readBuffer.limit() == readBuffer.capacity())) + { + resetReadBuffer(); + } + break; + } + + backend.receivePacket(packet.packet); + } + } + + /** + * This is called when we close the connection to do any + * pending cleanup work. Closing a connection marks it as + * not valid. + */ + public void close () throws IOException { + valid = false; + channel.close(); + log.log(Level.FINER, "invalidated id="+idNumber + " address=" + backend.getAddress()); + } + + /** + * Upon asynchronous connect completion this method is called by + * the Listener. + */ + public void connect() throws IOException { + throw new RuntimeException("connect() was called, address=" + backend.getAddress() + ". " + + "asynchronous connect in NIO is flawed!"); + } + + /** + * Since we are performing an asynchronous connect we are initially + * only interested in the <code>OP_CONNECT</code> event. + */ + public int selectOps () { + return SelectionKey.OP_READ; + } + + /** + * Return the underlying SocketChannel used by this connection. + */ + public SocketChannel socketChannel() { + return channel; + } + + + public String toString () { + return FS4Connection.class.getName() + "/" + channel; + } + + + //============================================================ + //==== readbuffer management + //============================================================ + + + /** + * Extend the readBuffer. Make a new buffer of the requested size + * copy the contents of the readBuffer into it and assign reference + * to readBuffer instance variable. + * + * <P> + * <b>The readBuffer needs to be in "readable" (flipped) state before + * this is called and it will be in the "writeable" state when it + * returns.</b> + */ + private void extendReadBuffer (int size) { + // we specifically check this because packetLength() can return -1 + // and someone might alter the code so that we do in fact get -1 + // ...which never happens as the code is now + // + if (size == -1) { + throw new RuntimeException("Invalid buffer size requested: -1"); + } + + // if we get a size that is smaller than the current + // readBuffer capacity we just double it. not sure how wise this + // might be. + // + if (size < readBuffer.capacity()) { + size = readBuffer.capacity() * 2; + } + + ByteBuffer tmp = ByteBuffer.allocate(size); + tmp.put(readBuffer); + log.fine("Extended readBuffer to " + size + " bytes" + + "from " + readBuffer.capacity() + " bytes"); + readBuffer = tmp; + } + + /** + * Clear the readBuffer, and if temporarily allocated bigger + * buffer is in use: ditch it and reset the reference to the + * fixed readBuffer. + */ + private void resetReadBuffer () { + fixedReadBuffer.clear(); + if (readBuffer == fixedReadBuffer) { + return; + } + log.fine("Resetting readbuffer"); + readBuffer = fixedReadBuffer; + } + + /** + * This method is used to determine whether the connection is still + * viable or not. All connections are initially valid, but they + * become invalid if we close the connection or something bad happens + * and the connection needs to be ditched. + */ + public boolean isValid() { + return valid; + } + +} diff --git a/container-search/src/main/java/com/yahoo/fs4/mplex/InvalidChannelException.java b/container-search/src/main/java/com/yahoo/fs4/mplex/InvalidChannelException.java new file mode 100644 index 00000000000..6176d069645 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/fs4/mplex/InvalidChannelException.java @@ -0,0 +1,15 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// -*- mode: java; folded-file: t; c-basic-offset: 4 -*- + +package com.yahoo.fs4.mplex; + +/** + * @author <a href="mailto:borud@yahoo-inc.com">Bj\u00f8rn Borud</a> + */ +@SuppressWarnings("serial") +public class InvalidChannelException extends Exception +{ + public InvalidChannelException (String message) { + super(message); + } +} diff --git a/container-search/src/main/java/com/yahoo/fs4/mplex/ListenerPool.java b/container-search/src/main/java/com/yahoo/fs4/mplex/ListenerPool.java new file mode 100644 index 00000000000..d76d270dcb7 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/fs4/mplex/ListenerPool.java @@ -0,0 +1,56 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.fs4.mplex; + +import com.yahoo.io.FatalErrorHandler; +import com.yahoo.io.Listener; + +import java.util.ArrayList; +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Pool of com.yahoo.io.Listener instances for shared use by Vespa backend + * searchers. + * + * @author baldersheim + * @since 5.3.0 + */ +public final class ListenerPool { + private final static Logger logger = Logger.getLogger(ListenerPool.class.getName()); + private final List<Listener> listeners; + + public ListenerPool(String name, int numListeners) { + listeners = new ArrayList<>(numListeners); + FatalErrorHandler fatalErrorHandler = new FatalErrorHandler(); + for (int i = 0; i < numListeners; i++) { + Listener listener = new Listener(name + "-" + i); + listener.setFatalErrorHandler(fatalErrorHandler); + listener.start(); + listeners.add(listener); + } + } + + public Listener get(int index) { + return listeners.get(index); + } + + public int size() { + return listeners.size(); + } + + public void close() { + for (Listener listener : listeners) { + listener.interrupt(); + } + try { + for (Listener listener : listeners) { + listener.join(); + } + } catch (InterruptedException e) { + logger.log(Level.WARNING, "Got interrupted", e); + Thread.currentThread().interrupt(); + } + } + +} |