aboutsummaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Channel.java
diff options
context:
space:
mode:
Diffstat (limited to 'container-search/src/main/java/com/yahoo/fs4/mplex/FS4Channel.java')
-rw-r--r--container-search/src/main/java/com/yahoo/fs4/mplex/FS4Channel.java255
1 files changed, 255 insertions, 0 deletions
diff --git a/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Channel.java b/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Channel.java
new file mode 100644
index 00000000000..adfc63d02f7
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Channel.java
@@ -0,0 +1,255 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.fs4.mplex;
+
+import com.yahoo.concurrent.SystemTimer;
+import com.yahoo.fs4.BasicPacket;
+import com.yahoo.fs4.ChannelTimeoutException;
+import com.yahoo.fs4.Packet;
+import com.yahoo.search.Query;
+import com.yahoo.search.dispatch.ResponseMonitor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
+
+/**
+ * This class is used to represent a "channel" in the FS4 protocol.
+ * A channel represents a session between a client and the fdispatch.
+ * Internally this class has a response queue used by the backend
+ * for queueing up FS4 packets that belong to this channel (or
+ * <em>session</em>, which might be a more appropriate name for it).
+ * Outbound packets are handed off to the FS4Connection.
+ *
+ * @author Bjorn Borud
+ */
+public class FS4Channel {
+
+ private static Logger log = Logger.getLogger(FS4Channel.class.getName());
+
+ private Integer channelId;
+ private Backend backend;
+ volatile private BlockingQueue<BasicPacket> responseQueue;
+ private Query query;
+ private boolean isPingChannel = false;
+ private ResponseMonitor<FS4Channel> monitor;
+
+ /** for unit testing. do not use */
+ protected FS4Channel () {
+ }
+
+ protected FS4Channel(Backend backend, Integer channelId) {
+ this.channelId = channelId;
+ this.backend = backend;
+ this.responseQueue = new LinkedBlockingQueue<>();
+ }
+
+ static public FS4Channel createPingChannel(Backend backend) {
+ FS4Channel pingChannel = new FS4Channel(backend, Integer.valueOf(0));
+ pingChannel.isPingChannel = true;
+ return pingChannel;
+ }
+
+ /** Set the query currently associated with this channel */
+ public void setQuery(Query query) {
+ this.query = query;
+ }
+
+ /** Get the query currently associated with this channel */
+ public Query getQuery() {
+ return query;
+ }
+
+ /** Returns the (fs4) channel id */
+ public Integer getChannelId () {
+ return channelId;
+ }
+
+ /**
+ * Closes the channel
+ */
+ public void close () {
+ BlockingQueue<BasicPacket> q = responseQueue;
+ responseQueue = null;
+ query = null;
+ if (isPingChannel) {
+ backend.removePingChannel();
+ } else {
+ backend.removeChannel(channelId);
+ }
+ if (q != null) {
+ q.clear();
+ }
+ }
+
+ /**
+ * Legacy interface.
+ */
+ public boolean sendPacket(BasicPacket packet) throws InvalidChannelException, IOException {
+ ensureValid();
+ return backend.sendPacket(packet, channelId);
+ }
+
+ /**
+ * Receives the given number of packets and returns them, OR
+ * <ul>
+ * <li>Returns a smaller number of packets if an error or eol packet is received
+ * <li>Throws a ChannelTimeoutException if timeout occurs before all packets
+ * are received. Packets received with the wrong channel id are ignored.
+ * </ul>
+ *
+ * @param timeout the number of ms to attempt to get packets before throwing an exception
+ * @param packetCount the number of packets to receive, or -1 to receive any number up to eol/error
+ */
+ public BasicPacket[] receivePackets(long timeout, int packetCount)
+ throws InvalidChannelException, ChannelTimeoutException {
+ ensureValid();
+
+ List<BasicPacket> packets = new ArrayList<>(12);
+ long startTime = SystemTimer.INSTANCE.milliTime();
+ long timeLeft = timeout;
+
+ try {
+ while (timeLeft >= 0) {
+ BasicPacket p = nextPacket(timeLeft);
+ if (p == null) throw new ChannelTimeoutException("Timed out");
+
+ if (!isPingChannel && ((Packet)p).getChannel() != getChannelId().intValue()) {
+ log.warning("Ignoring received " + p + ", when excepting channel " + getChannelId());
+ continue;
+ }
+
+ packets.add(p);
+ if (isLastPacket(p) || hasEnoughPackets(packetCount, packets)) {
+ BasicPacket[] packetArray = new BasicPacket[packets.size()];
+ packets.toArray(packetArray);
+ return packetArray;
+ }
+
+ // doing this last might save us one system call for the last
+ // packet.
+ timeLeft = timeout - (SystemTimer.INSTANCE.milliTime() - startTime);
+ }
+ }
+ catch (InvalidChannelException e) {
+ // nop. if we get this we want to return the default
+ // zero length packet array indicating that we have no
+ // valid response
+ log.info("FS4Channel was invalid. timeLeft="
+ + timeLeft + ", timeout=" + timeout);
+ }
+ catch (InterruptedException e) {
+ log.info("FS4Channel was interrupted. timeLeft="
+ + timeLeft + ", timeout=" + timeout);
+ Thread.currentThread().interrupt();
+ }
+
+ // default return, we only hit this if we timed out and
+ // did not get the end of the packet stream
+ throw new ChannelTimeoutException();
+ }
+
+ private static boolean hasEnoughPackets(int packetCount,List<BasicPacket> packets) {
+ if (packetCount<0) return false;
+ return packets.size()>=packetCount;
+ }
+
+ /**
+ * Returns true if we will definitely receive more packets on this stream
+ *
+ * Shouldn't that be "_not_ receive more packets"?
+ */
+ private static boolean isLastPacket (BasicPacket packet) {
+ if (packet instanceof com.yahoo.fs4.ErrorPacket) return true;
+ if (packet instanceof com.yahoo.fs4.EolPacket) return true;
+ if (packet instanceof com.yahoo.fs4.PongPacket) return true;
+ return false;
+ }
+
+ /**
+ * Return the next available packet from the response queue. If there
+ * are no packets available we wait a maximum of <code>timeout</code>
+ * milliseconds before returning a <code>null</code>
+ *
+ * @param timeout Number of milliseconds to wait for a packet
+ * to become available.
+ *
+ * @return Returns the next available <code>BasicPacket</code> or
+ * <code>null</code> if we timed out.
+ */
+ public BasicPacket nextPacket(long timeout)
+ throws InterruptedException, InvalidChannelException
+ {
+ return ensureValidQ().poll(timeout, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Add incoming packet to the response queue. This is to be used
+ * by the listener for placing incoming packets in the response
+ * queue.
+ *
+ * @param packet BasicPacket to be placed in the response queue.
+ *
+ */
+ protected void addPacket (BasicPacket packet)
+ throws InterruptedException, InvalidChannelException
+ {
+ ensureValidQ().put(packet);
+ notifyMonitor();
+ }
+
+ /**
+ * A valid FS4Channel is one that has not yet been closed.
+ *
+ * @return Returns <code>true</code> if the FS4Channel is valid.
+ */
+ public boolean isValid () {
+ return responseQueue != null;
+ }
+
+ /**
+ * This method is called whenever we want to perform an operation
+ * which assumes that the FS4Channel object is valid. An exception
+ * is thrown if the opposite turns out to be the case.
+ *
+ * @throws InvalidChannelException if the channel is no longer valid.
+ */
+ private void ensureValid () throws InvalidChannelException {
+ if (isValid()) {
+ return;
+ }
+ throw new InvalidChannelException("Channel is no longer valid");
+ }
+
+ /**
+ * This method is called whenever we want to perform an operation
+ * which assumes that the FS4Channel object is valid. An exception
+ * is thrown if the opposite turns out to be the case.
+ *
+ * @throws InvalidChannelException if the channel is no longer valid.
+ */
+ private BlockingQueue<BasicPacket> ensureValidQ () throws InvalidChannelException {
+ BlockingQueue<BasicPacket> q = responseQueue;
+ if (q != null) {
+ return q;
+ }
+ throw new InvalidChannelException("Channel is no longer valid");
+ }
+
+ public String toString() {
+ return "fs4 channel " + channelId + (isValid() ? " [valid]" : " [invalid]");
+ }
+
+ public void setResponseMonitor(ResponseMonitor<FS4Channel> monitor) {
+ this.monitor = monitor;
+ }
+
+ protected void notifyMonitor() {
+ if(monitor != null) {
+ monitor.responseAvailable(this);
+ }
+ }
+}