aboutsummaryrefslogtreecommitdiffstats
path: root/fnet/src/vespa/fnet/packetqueue.h
blob: d631996cd49963edab9098ac12869ce383cff024 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.

#pragma once

#include "ipackethandler.h"
#include <mutex>
#include <condition_variable>

/**
 * This class implements a queue of packets. Being in a queue does not
 * affect the packet's internal data. This is the superclass of the
 * @ref FNET_PacketQueue. As seen by its name, this class has no
 * locking. All functionallity offered by this class is also available
 * in the subclass. However, this class may be a good lightweight
 * alternative to the heavier subclass in single threaded applications
 * or where the surrounding code handles thread-safety.
 **/
class FNET_PacketQueue_NoLock : public FNET_IPacketHandler
{
private:
    FNET_PacketQueue_NoLock(const FNET_PacketQueue_NoLock &);
    FNET_PacketQueue_NoLock &operator=(const FNET_PacketQueue_NoLock &);


protected:
#ifndef IAM_DOXYGEN
    struct _QElem
    {
        FNET_Packet  *_packet;
        FNET_Context  _context;
    };
#endif // DOXYGEN

    _QElem      *_buf;
    uint32_t     _bufsize;
    uint32_t     _bufused;
    uint32_t     _in_pos;
    uint32_t     _out_pos;
    HP_RetCode   _hpRetCode; // HandlePacket return value


    /**
     * Ensure that we have enough free space on the queue. Calling this
     * method will expand the queue by calling the ExpandBuf method if
     * there is insufficient free space.
     *
     * @param needentries the number of free packet entries needed.
     **/
    void EnsureFree(uint32_t needentries = 1)
    {
        if (_bufsize < _bufused + needentries)
            ExpandBuf(needentries);
    }


    /**
     * Expand the buffer capacity of this queue.
     *
     * @param needentries the number of free packet entries needed.
     **/
    void ExpandBuf(uint32_t needentries);


public:

    /**
     * Construct a packet queue.
     *
     * @param len initial number of free packet entries. Default is 64.
     * @param hpRetCode the value that should be returned when used
     *                  as a packet handler. Default is FNET_KEEP_CHANNEL.
     **/
    FNET_PacketQueue_NoLock(uint32_t len = 64,
                            HP_RetCode hpRetCode = FNET_KEEP_CHANNEL);
    virtual ~FNET_PacketQueue_NoLock();


    /**
     * Handle incoming packet by putting it on the queue. This method
     * uses the hpRetCode value given to the constructor to decide what
     * to do with the channel delivering the packet.
     *
     * @return channel command: keep open, close or free.
     * @param packet the packet to handle.
     * @param context the packet context.
     **/
    HP_RetCode HandlePacket(FNET_Packet *packet, FNET_Context context) override;


    /**
     * Queue a packet. NOTE: packet handover (caller TO invoked object).
     *
     * @param packet the packet you want to queue.
     * @param context the context for the packet.
     **/
    void QueuePacket_NoLock(FNET_Packet *packet, FNET_Context context);


    /**
     * Check if the queue is empty.
     *
     * @return true if empty, false otherwise.
     **/
    bool IsEmpty_NoLock() { return _bufused == 0; }


    /**
     * Obtain the number of packets on the queue.
     *
     * @return number of packets on the queue.
     **/
    uint32_t GetPacketCnt_NoLock() { return _bufused; }


    /**
     * Remove the first packet from the queue and return it. If the
     * queue was empty, nullptr is returned. NOTE: packet handover (invoked
     * object TO caller).
     *
     * @return first packet in queue or nullptr.
     * @param context where to store the packet context.
     **/
    FNET_Packet *DequeuePacket_NoLock(FNET_Context *context);


    /**
     * Move all packets currently in this packet queue into the queue
     * given as parameter. NOTE: caller should have exclusive access to
     * the packet queue given as parameter.
     *
     * @return number of packets flushed.
     * @param target where to flush the packets.
     **/
    uint32_t FlushPackets_NoLock(FNET_PacketQueue_NoLock *target);


    /**
     * This method is called by the destructor to discard (invoke Free
     * on) all packets in this packet queue. This method is also called
     * by the FNET_Connection::Close method in order to get rid of the
     * packets in the output queue as soon as possible.
     **/
    void DiscardPackets_NoLock();


    /**
     * Print the contents of this packet queue to stdout. Useful for
     * debugging purposes.
     **/
    void Print(uint32_t indent = 0);
};


//------------------------------------------------------------------


/**
 * This class implements a queue of packets. Being in a queue does not
 * affect the packet's internal data. This is an extension of the @ref
 * FNET_PacketQueue_NoLock class that also supports thread-safe
 * operations. The indirectly inherited packethandler callback method
 * and the print method are overridden to support thread-safe
 * behavior.
 **/
class FNET_PacketQueue : public FNET_PacketQueue_NoLock
{
private:
    FNET_PacketQueue(const FNET_PacketQueue &);
    FNET_PacketQueue &operator=(const FNET_PacketQueue &);


protected:
    std::mutex              _lock;
    std::condition_variable _cond;
    uint32_t                _waitCnt;


public:

    /**
     * Construct a packet queue.
     *
     * @param len initial number of free packet entries. Default is 64.
     * @param hpRetCode the value that should be returned when used
     *                  as a packet handler. Default is FNET_KEEP_CHANNEL.
     **/
    FNET_PacketQueue(uint32_t len = 64, HP_RetCode hpRetCode = FNET_KEEP_CHANNEL);
    ~FNET_PacketQueue();

    /**
     * Handle incoming packet by putting it on the queue. This method
     * uses the hpRetCode value given to the constructor to decide what
     * to do with the channel delivering the packet.
     *
     * @return channel command: keep open, close or free.
     * @param packet the packet to handle.
     * @param context the packet context.
     **/
    HP_RetCode HandlePacket(FNET_Packet *packet, FNET_Context context) override;


    /**
     * Insert a packet into this packet queue. If the queue is too small
     * it will be extended automatically. NOTE: packet handover (caller
     * TO invoked object).
     *
     * @param packet packet you want to queue.
     * @param context the context for the packet.
     **/
    void QueuePacket(FNET_Packet *packet, FNET_Context context);


    /**
     * Obtain the first packet in this packet queue. If the queue is
     * currently empty, the calling thread will wait until a packet is
     * available on the queue. NOTE: packet handover (invoked object TO
     * caller)
     *
     * @return a packet obtained from this queue
     * @param context where to store the packet context.
     **/
    FNET_Packet *DequeuePacket(FNET_Context *context);


    /**
     * Obtain the first packet in this packet queue. If the queue is
     * currently empty, the calling thread will wait until a packet is
     * available on the queue, but for no more than 'maxwait'
     * milliseconds. NOTE: packet handover (invoked object TO caller)
     *
     * @return a packet obtained from the queue or nullptr.
     * @param maxwait maximum number of milliseconds before this
     *        method call returns.
     * @param context where to store packet context.
     **/
    FNET_Packet *DequeuePacket(uint32_t maxwait,
                               FNET_Context *context);


    /**
     * Print the contents of this packet queue to stdout. Useful for
     * debugging purposes.
     **/
    void Print(uint32_t indent = 0);
};