diff options
Diffstat (limited to 'container-search/src/main/java/com/yahoo/fs4/mplex/FS4Channel.java')
-rw-r--r-- | container-search/src/main/java/com/yahoo/fs4/mplex/FS4Channel.java | 255 |
1 files changed, 0 insertions, 255 deletions
diff --git a/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Channel.java b/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Channel.java deleted file mode 100644 index adfc63d02f7..00000000000 --- a/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Channel.java +++ /dev/null @@ -1,255 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.fs4.mplex; - -import com.yahoo.concurrent.SystemTimer; -import com.yahoo.fs4.BasicPacket; -import com.yahoo.fs4.ChannelTimeoutException; -import com.yahoo.fs4.Packet; -import com.yahoo.search.Query; -import com.yahoo.search.dispatch.ResponseMonitor; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.logging.Logger; - -/** - * This class is used to represent a "channel" in the FS4 protocol. - * A channel represents a session between a client and the fdispatch. - * Internally this class has a response queue used by the backend - * for queueing up FS4 packets that belong to this channel (or - * <em>session</em>, which might be a more appropriate name for it). - * Outbound packets are handed off to the FS4Connection. - * - * @author Bjorn Borud - */ -public class FS4Channel { - - private static Logger log = Logger.getLogger(FS4Channel.class.getName()); - - private Integer channelId; - private Backend backend; - volatile private BlockingQueue<BasicPacket> responseQueue; - private Query query; - private boolean isPingChannel = false; - private ResponseMonitor<FS4Channel> monitor; - - /** for unit testing. do not use */ - protected FS4Channel () { - } - - protected FS4Channel(Backend backend, Integer channelId) { - this.channelId = channelId; - this.backend = backend; - this.responseQueue = new LinkedBlockingQueue<>(); - } - - static public FS4Channel createPingChannel(Backend backend) { - FS4Channel pingChannel = new FS4Channel(backend, Integer.valueOf(0)); - pingChannel.isPingChannel = true; - return pingChannel; - } - - /** Set the query currently associated with this channel */ - public void setQuery(Query query) { - this.query = query; - } - - /** Get the query currently associated with this channel */ - public Query getQuery() { - return query; - } - - /** Returns the (fs4) channel id */ - public Integer getChannelId () { - return channelId; - } - - /** - * Closes the channel - */ - public void close () { - BlockingQueue<BasicPacket> q = responseQueue; - responseQueue = null; - query = null; - if (isPingChannel) { - backend.removePingChannel(); - } else { - backend.removeChannel(channelId); - } - if (q != null) { - q.clear(); - } - } - - /** - * Legacy interface. - */ - public boolean sendPacket(BasicPacket packet) throws InvalidChannelException, IOException { - ensureValid(); - return backend.sendPacket(packet, channelId); - } - - /** - * Receives the given number of packets and returns them, OR - * <ul> - * <li>Returns a smaller number of packets if an error or eol packet is received - * <li>Throws a ChannelTimeoutException if timeout occurs before all packets - * are received. Packets received with the wrong channel id are ignored. - * </ul> - * - * @param timeout the number of ms to attempt to get packets before throwing an exception - * @param packetCount the number of packets to receive, or -1 to receive any number up to eol/error - */ - public BasicPacket[] receivePackets(long timeout, int packetCount) - throws InvalidChannelException, ChannelTimeoutException { - ensureValid(); - - List<BasicPacket> packets = new ArrayList<>(12); - long startTime = SystemTimer.INSTANCE.milliTime(); - long timeLeft = timeout; - - try { - while (timeLeft >= 0) { - BasicPacket p = nextPacket(timeLeft); - if (p == null) throw new ChannelTimeoutException("Timed out"); - - if (!isPingChannel && ((Packet)p).getChannel() != getChannelId().intValue()) { - log.warning("Ignoring received " + p + ", when excepting channel " + getChannelId()); - continue; - } - - packets.add(p); - if (isLastPacket(p) || hasEnoughPackets(packetCount, packets)) { - BasicPacket[] packetArray = new BasicPacket[packets.size()]; - packets.toArray(packetArray); - return packetArray; - } - - // doing this last might save us one system call for the last - // packet. - timeLeft = timeout - (SystemTimer.INSTANCE.milliTime() - startTime); - } - } - catch (InvalidChannelException e) { - // nop. if we get this we want to return the default - // zero length packet array indicating that we have no - // valid response - log.info("FS4Channel was invalid. timeLeft=" - + timeLeft + ", timeout=" + timeout); - } - catch (InterruptedException e) { - log.info("FS4Channel was interrupted. timeLeft=" - + timeLeft + ", timeout=" + timeout); - Thread.currentThread().interrupt(); - } - - // default return, we only hit this if we timed out and - // did not get the end of the packet stream - throw new ChannelTimeoutException(); - } - - private static boolean hasEnoughPackets(int packetCount,List<BasicPacket> packets) { - if (packetCount<0) return false; - return packets.size()>=packetCount; - } - - /** - * Returns true if we will definitely receive more packets on this stream - * - * Shouldn't that be "_not_ receive more packets"? - */ - private static boolean isLastPacket (BasicPacket packet) { - if (packet instanceof com.yahoo.fs4.ErrorPacket) return true; - if (packet instanceof com.yahoo.fs4.EolPacket) return true; - if (packet instanceof com.yahoo.fs4.PongPacket) return true; - return false; - } - - /** - * Return the next available packet from the response queue. If there - * are no packets available we wait a maximum of <code>timeout</code> - * milliseconds before returning a <code>null</code> - * - * @param timeout Number of milliseconds to wait for a packet - * to become available. - * - * @return Returns the next available <code>BasicPacket</code> or - * <code>null</code> if we timed out. - */ - public BasicPacket nextPacket(long timeout) - throws InterruptedException, InvalidChannelException - { - return ensureValidQ().poll(timeout, TimeUnit.MILLISECONDS); - } - - /** - * Add incoming packet to the response queue. This is to be used - * by the listener for placing incoming packets in the response - * queue. - * - * @param packet BasicPacket to be placed in the response queue. - * - */ - protected void addPacket (BasicPacket packet) - throws InterruptedException, InvalidChannelException - { - ensureValidQ().put(packet); - notifyMonitor(); - } - - /** - * A valid FS4Channel is one that has not yet been closed. - * - * @return Returns <code>true</code> if the FS4Channel is valid. - */ - public boolean isValid () { - return responseQueue != null; - } - - /** - * This method is called whenever we want to perform an operation - * which assumes that the FS4Channel object is valid. An exception - * is thrown if the opposite turns out to be the case. - * - * @throws InvalidChannelException if the channel is no longer valid. - */ - private void ensureValid () throws InvalidChannelException { - if (isValid()) { - return; - } - throw new InvalidChannelException("Channel is no longer valid"); - } - - /** - * This method is called whenever we want to perform an operation - * which assumes that the FS4Channel object is valid. An exception - * is thrown if the opposite turns out to be the case. - * - * @throws InvalidChannelException if the channel is no longer valid. - */ - private BlockingQueue<BasicPacket> ensureValidQ () throws InvalidChannelException { - BlockingQueue<BasicPacket> q = responseQueue; - if (q != null) { - return q; - } - throw new InvalidChannelException("Channel is no longer valid"); - } - - public String toString() { - return "fs4 channel " + channelId + (isValid() ? " [valid]" : " [invalid]"); - } - - public void setResponseMonitor(ResponseMonitor<FS4Channel> monitor) { - this.monitor = monitor; - } - - protected void notifyMonitor() { - if(monitor != null) { - monitor.responseAvailable(this); - } - } -} |