diff options
Diffstat (limited to 'container-search/src/main/java/com/yahoo/fs4/mplex/FS4Connection.java')
-rw-r--r-- | container-search/src/main/java/com/yahoo/fs4/mplex/FS4Connection.java | 371 |
1 files changed, 0 insertions, 371 deletions
diff --git a/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Connection.java b/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Connection.java deleted file mode 100644 index 7dcbefde9fa..00000000000 --- a/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Connection.java +++ /dev/null @@ -1,371 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -package com.yahoo.fs4.mplex; - - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.SelectionKey; -import java.nio.channels.SocketChannel; -import java.util.LinkedList; -import java.util.logging.Level; -import java.util.logging.Logger; - -import com.yahoo.fs4.BasicPacket; -import com.yahoo.fs4.BufferTooSmallException; -import com.yahoo.fs4.PacketDecoder; -import com.yahoo.fs4.PacketListener; -import com.yahoo.io.Connection; -import com.yahoo.io.Listener; -import com.yahoo.log.LogLevel; - -/** - * - * This class is used to represent a connection to an fdispatch - * - * @author <a href="mailto:borud@yahoo-inc.com">Bjorn Borud</a> - */ -public class FS4Connection implements Connection -{ - private static Logger log = Logger.getLogger(FS4Connection.class.getName()); - private Backend backend; - private Listener listener; - private SocketChannel channel; - - private boolean shouldWrite = false; - - private static int idCounter = 1; - private int idNumber; - - // outbound data - private ByteBuffer writeBuffer; - private LinkedList<ByteBuffer> writeBufferList = new LinkedList<>(); - - // inbound data - private ByteBuffer fixedReadBuffer = ByteBuffer.allocateDirect(256 * 1024); - private ByteBuffer readBuffer = fixedReadBuffer; - - private volatile boolean valid = true; - - private final PacketListener packetListener; - - - /** - * Create an FS4 Connection. - */ - public FS4Connection (SocketChannel channel, Listener listener, Backend backend, PacketListener packetListener) { - this.backend = backend; - this.listener = listener; - this.channel = channel; - this.idNumber = idCounter++; - this.packetListener = packetListener; - - log.log(Level.FINER, "new: "+this+", id="+idNumber + ", address=" + backend.getAddress()); - } - - - /** - * Packet sending interface. - */ - public void sendPacket (BasicPacket packet, Integer channelId) throws IOException { - ByteBuffer buffer = packet.grantEncodingBuffer(channelId.intValue(), backend.getBufferPool().alloc()); - ByteBuffer viewForPacketListener = buffer.slice(); - synchronized (this) { - if (!(valid && channel.isOpen())) { - throw new IllegalStateException("Connection is not valid. " + - "Address = " + backend.getAddress() + - ", valid = " + valid + - ", isOpen = " + channel.isOpen()); - } - - if (writeBuffer == null) { - writeBuffer = buffer; - } else { - writeBufferList.addLast(buffer); - enableWrite(); - } - write(); - } - - if (packetListener != null) - packetListener.packetSent(backend.getChannel(channelId), packet, viewForPacketListener); - } - - - /** - * The write event handler. This can be called both from the client - * thread and from the IO thread, so it needs to be synchronized. It - * assumes that IO is nonblocking, and will attempt to keep writing - * data until the system won't accept more data. - * - */ - public synchronized void write () throws IOException { - if (! channel.isOpen()) { - throw new IllegalStateException("Channel not open in write(), address=" + backend.getAddress()); - } - - try { - int bytesWritten = 0; - boolean isFinished = false; - do { - // if writeBuffer is not set we need to fetch the next buffer - if (writeBuffer == null) { - - // if the list is empty, signal the selector we do not need - // to do any writing for a while yet and bail - if (writeBufferList.isEmpty()) { - disableWrite(); - isFinished = true; - break; - } - writeBuffer = writeBufferList.removeFirst(); - } - - // invariants: we have a writeBuffer - bytesWritten = channel.write(writeBuffer); - - // buffer drained so we forget it and see what happens when we - // go around. if indeed we go around - if (!writeBuffer.hasRemaining()) { - writeBuffer.clear(); - backend.getBufferPool().free(writeBuffer); - writeBuffer = null; - } - } while (bytesWritten > 0); - if (!isFinished) { - enableWrite(); - } - } catch (IOException e) { - log.log(LogLevel.DEBUG, "Failed writing to channel for backend " + backend.getAddress() + - ". Closing channel", e); - try { - close(); - } catch (IOException ignored) {} - - throw e; - } - } - - - private void disableWrite() { - if (shouldWrite) { - listener.modifyInterestOpsBatch(this, SelectionKey.OP_WRITE, false); - shouldWrite = false; - } - } - - - private void enableWrite() { - if (!shouldWrite) { - listener.modifyInterestOps(this, SelectionKey.OP_WRITE, true); - shouldWrite = true; - } - } - - - - public void read () throws IOException { - if (! channel.isOpen()) { - throw new IOException("Channel not open in read(), address=" + backend.getAddress()); - } - - int bytesRead = 0; - - do { - try { - if (readBuffer == fixedReadBuffer) { - bytesRead = channel.read(readBuffer); - } else { - fixedReadBuffer.clear(); - if (readBuffer.remaining() < fixedReadBuffer.capacity()) { - fixedReadBuffer.limit(readBuffer.remaining()); - } - bytesRead = channel.read(fixedReadBuffer); - fixedReadBuffer.flip(); - readBuffer.put(fixedReadBuffer); - fixedReadBuffer.clear(); - } - } - catch (IOException e) { - // this is the "normal" way that connection closes. - log.log(Level.FINER, "Read exception address=" + backend.getAddress() + " id="+idNumber+": "+ - e.getClass().getName()+" / ", e); - bytesRead = -1; - } - - // end of file - if (bytesRead == -1) { - log.log(LogLevel.DEBUG, "Dispatch closed connection" - + " (id="+idNumber+", address=" + backend.getAddress() + ")"); - try { - close(); - } catch (Exception e) { - log.log(Level.WARNING, "Close failed, address=" + backend.getAddress(), e); - } - } - - // no more read - if (bytesRead == 0) { - // buffer too small? - if (! readBuffer.hasRemaining()) { - log.fine("Buffer possibly too small, extending"); - readBuffer.flip(); - extendReadBuffer(readBuffer.capacity() * 2); - } - } - - } while (bytesRead > 0); - - readBuffer.flip(); - - // hand off packet extraction - extractPackets(readBuffer); - } - - private void extractPackets(ByteBuffer readBuffer) { - for (;;) { - PacketDecoder.DecodedPacket packet = null; - try { - FS4Channel receiver = null; - int queryId = PacketDecoder.sniffChannel(readBuffer); - if (queryId == 0) { - if (PacketDecoder.isPongPacket(readBuffer)) - receiver = backend.getPingChannel(); - } - else { - receiver = backend.getChannel(Integer.valueOf(queryId)); - } - packet = PacketDecoder.extractPacket(readBuffer); - - if (packet != null) - packetListener.packetReceived(receiver, packet.packet, packet.consumedBytes); - } - catch (BufferTooSmallException e) { - log.fine("Unable to decode, extending readBuffer"); - extendReadBuffer(PacketDecoder.packetLength(readBuffer)); - return; - } - - // break out of loop if we did not get a packet out of the - // buffer so we can select and read some more - if (packet == null) { - - // if the buffer has been cleared, we can do a reset - // of the readBuffer - if ((readBuffer.position() == 0) - && (readBuffer.limit() == readBuffer.capacity())) - { - resetReadBuffer(); - } - break; - } - - backend.receivePacket(packet.packet); - } - } - - /** - * This is called when we close the connection to do any - * pending cleanup work. Closing a connection marks it as - * not valid. - */ - public void close () throws IOException { - valid = false; - channel.close(); - log.log(Level.FINER, "invalidated id="+idNumber + " address=" + backend.getAddress()); - } - - /** - * Upon asynchronous connect completion this method is called by - * the Listener. - */ - public void connect() throws IOException { - throw new RuntimeException("connect() was called, address=" + backend.getAddress() + ". " - + "asynchronous connect in NIO is flawed!"); - } - - /** - * Since we are performing an asynchronous connect we are initially - * only interested in the <code>OP_CONNECT</code> event. - */ - public int selectOps () { - return SelectionKey.OP_READ; - } - - /** - * Return the underlying SocketChannel used by this connection. - */ - public SocketChannel socketChannel() { - return channel; - } - - - public String toString () { - return FS4Connection.class.getName() + "/" + channel; - } - - - //============================================================ - //==== readbuffer management - //============================================================ - - - /** - * Extend the readBuffer. Make a new buffer of the requested size - * copy the contents of the readBuffer into it and assign reference - * to readBuffer instance variable. - * - * <P> - * <b>The readBuffer needs to be in "readable" (flipped) state before - * this is called and it will be in the "writeable" state when it - * returns.</b> - */ - private void extendReadBuffer (int size) { - // we specifically check this because packetLength() can return -1 - // and someone might alter the code so that we do in fact get -1 - // ...which never happens as the code is now - // - if (size == -1) { - throw new RuntimeException("Invalid buffer size requested: -1"); - } - - // if we get a size that is smaller than the current - // readBuffer capacity we just double it. not sure how wise this - // might be. - // - if (size < readBuffer.capacity()) { - size = readBuffer.capacity() * 2; - } - - ByteBuffer tmp = ByteBuffer.allocate(size); - tmp.put(readBuffer); - log.fine("Extended readBuffer to " + size + " bytes" - + "from " + readBuffer.capacity() + " bytes"); - readBuffer = tmp; - } - - /** - * Clear the readBuffer, and if temporarily allocated bigger - * buffer is in use: ditch it and reset the reference to the - * fixed readBuffer. - */ - private void resetReadBuffer () { - fixedReadBuffer.clear(); - if (readBuffer == fixedReadBuffer) { - return; - } - log.fine("Resetting readbuffer"); - readBuffer = fixedReadBuffer; - } - - /** - * This method is used to determine whether the connection is still - * viable or not. All connections are initially valid, but they - * become invalid if we close the connection or something bad happens - * and the connection needs to be ditched. - */ - public boolean isValid() { - return valid; - } - -} |