aboutsummaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Channel.java
blob: adfc63d02f7d73254e51e89a60dca1336acf38a7 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.fs4.mplex;

import com.yahoo.concurrent.SystemTimer;
import com.yahoo.fs4.BasicPacket;
import com.yahoo.fs4.ChannelTimeoutException;
import com.yahoo.fs4.Packet;
import com.yahoo.search.Query;
import com.yahoo.search.dispatch.ResponseMonitor;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

/**
 * This class is used to represent a "channel" in the FS4 protocol.
 * A channel represents a session between a client and the fdispatch.
 * Internally this class has a  response queue used by the backend
 * for queueing up FS4 packets that belong to this channel (or
 * <em>session</em>, which might be a more appropriate name for it).
 * Outbound packets are handed off to the FS4Connection.
 *
 * @author Bjorn Borud
 */
public class FS4Channel {

    private static Logger log = Logger.getLogger(FS4Channel.class.getName());

    private Integer channelId;
    private Backend backend;
    volatile private BlockingQueue<BasicPacket> responseQueue;
    private Query query;
    private boolean isPingChannel = false;
    private ResponseMonitor<FS4Channel> monitor;

    /** for unit testing.  do not use */
    protected FS4Channel () {
    }

    protected FS4Channel(Backend backend, Integer channelId) {
        this.channelId = channelId;
        this.backend = backend;
        this.responseQueue = new LinkedBlockingQueue<>();
    }

    static public FS4Channel createPingChannel(Backend backend) {
        FS4Channel pingChannel = new FS4Channel(backend, Integer.valueOf(0));
        pingChannel.isPingChannel = true;
        return pingChannel;
    }

    /** Set the query currently associated with this channel */
    public void setQuery(Query query) {
        this.query = query;
    }

    /** Get the query currently associated with this channel */
    public Query getQuery() {
        return query;
    }

    /** Returns the (fs4) channel id */
    public Integer getChannelId () {
        return channelId;
    }

    /**
     * Closes the channel
     */
    public void close () {
        BlockingQueue<BasicPacket> q = responseQueue;
        responseQueue = null;
        query = null;
        if (isPingChannel) {
            backend.removePingChannel();
        } else {
            backend.removeChannel(channelId);
        }
        if (q != null) {
            q.clear();
        }
    }

    /**
     * Legacy interface.
     */
    public boolean sendPacket(BasicPacket packet) throws InvalidChannelException, IOException {
        ensureValid();
        return backend.sendPacket(packet, channelId);
    }

    /**
     * Receives the given number of packets and returns them, OR
     * <ul>
     * <li>Returns a smaller number of packets if an error or eol packet is received
     * <li>Throws a ChannelTimeoutException if timeout occurs before all packets
     * are received. Packets received with the wrong channel id are ignored.
     * </ul>
     *
     * @param timeout the number of ms to attempt to get packets before throwing an exception
     * @param packetCount the number of packets to receive, or -1 to receive any number up to eol/error
     */
    public BasicPacket[] receivePackets(long timeout, int packetCount)
            throws InvalidChannelException, ChannelTimeoutException {
        ensureValid();

        List<BasicPacket> packets = new ArrayList<>(12);
        long startTime = SystemTimer.INSTANCE.milliTime();
        long timeLeft  = timeout;

        try {
            while (timeLeft >= 0) {
                BasicPacket p = nextPacket(timeLeft);
                if (p == null) throw new ChannelTimeoutException("Timed out");

                if (!isPingChannel && ((Packet)p).getChannel() != getChannelId().intValue()) {
                    log.warning("Ignoring received " + p + ", when excepting channel " + getChannelId());
                    continue;
                }

                packets.add(p);
                if (isLastPacket(p) || hasEnoughPackets(packetCount, packets)) {
                    BasicPacket[] packetArray = new BasicPacket[packets.size()];
                    packets.toArray(packetArray);
                    return packetArray;
                }

                // doing this last might save us one system call for the last
                // packet.
                timeLeft = timeout - (SystemTimer.INSTANCE.milliTime() - startTime);
            }
        }
        catch (InvalidChannelException e) {
            // nop.  if we get this we want to return the default
            // zero length packet array indicating that we have no
            // valid response
            log.info("FS4Channel was invalid. timeLeft="
                     + timeLeft + ", timeout=" + timeout);
        }
        catch (InterruptedException e) {
            log.info("FS4Channel was interrupted. timeLeft="
                     + timeLeft + ", timeout=" + timeout);
            Thread.currentThread().interrupt();
        }

        // default return, we only hit this if we timed out and
        // did not get the end of the packet stream
        throw new ChannelTimeoutException();
    }

    private static boolean hasEnoughPackets(int packetCount,List<BasicPacket> packets) {
        if (packetCount<0) return false;
        return packets.size()>=packetCount;
    }

    /**
     * Returns true if we will definitely receive more packets on this stream
     *
     * Shouldn't that be "_not_ receive more packets"?
     */
    private static boolean isLastPacket (BasicPacket packet) {
        if (packet instanceof com.yahoo.fs4.ErrorPacket) return true;
        if (packet instanceof com.yahoo.fs4.EolPacket) return true;
        if (packet instanceof com.yahoo.fs4.PongPacket) return true;
        return false;
    }

    /**
     * Return the next available packet from the response queue.  If there
     * are no packets available we wait a maximum of <code>timeout</code>
     * milliseconds before returning a <code>null</code>
     *
     * @param timeout Number of milliseconds to wait for a packet
     *                to become available.
     *
     * @return Returns the next available <code>BasicPacket</code> or
     *         <code>null</code> if we timed out.
     */
    public BasicPacket nextPacket(long timeout)
        throws InterruptedException, InvalidChannelException
    {
        return ensureValidQ().poll(timeout, TimeUnit.MILLISECONDS);
    }

    /**
     * Add incoming packet to the response queue.  This is to be used
     * by the listener for placing incoming packets in the response
     * queue.
     *
     * @param packet BasicPacket to be placed in the response queue.
     *
     */
    protected void addPacket (BasicPacket packet)
        throws InterruptedException, InvalidChannelException
    {
        ensureValidQ().put(packet);
        notifyMonitor();
    }

    /**
     * A valid FS4Channel is one that has not yet been closed.
     *
     * @return Returns <code>true</code> if the FS4Channel is valid.
     */
    public boolean isValid () {
        return responseQueue != null;
    }

    /**
     * This method is called whenever we want to perform an operation
     * which assumes that the FS4Channel object is valid.  An exception
     * is thrown if the opposite turns out to be the case.
     *
     * @throws InvalidChannelException if the channel is no longer valid.
     */
    private void ensureValid () throws InvalidChannelException {
        if (isValid()) {
            return;
        }
        throw new InvalidChannelException("Channel is no longer valid");
    }

    /**
     * This method is called whenever we want to perform an operation
     * which assumes that the FS4Channel object is valid.  An exception
     * is thrown if the opposite turns out to be the case.
     *
     * @throws InvalidChannelException if the channel is no longer valid.
     */
    private BlockingQueue<BasicPacket> ensureValidQ () throws InvalidChannelException {
        BlockingQueue<BasicPacket> q = responseQueue;
        if (q != null) {
            return q;
        }
        throw new InvalidChannelException("Channel is no longer valid");
    }

    public String toString() {
        return "fs4 channel " + channelId + (isValid() ? " [valid]" : " [invalid]");
    }

    public void setResponseMonitor(ResponseMonitor<FS4Channel> monitor) {
        this.monitor = monitor;
    }

    protected void notifyMonitor() {
        if(monitor != null) {
            monitor.responseAvailable(this);
        }
    }
}