aboutsummaryrefslogtreecommitdiffstats
path: root/fnet/src/vespa/fnet/transport_thread.h
blob: c7ada4725015f0862486513c2a1b61099086f787 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.

#pragma once

#include "scheduler.h"
#include "config.h"
#include "task.h"
#include "packetqueue.h"
#include <vespa/vespalib/net/socket_handle.h>
#include <vespa/vespalib/net/selector.h>
#include <vespa/vespalib/util/thread.h>
#include <atomic>
#include <mutex>
#include <condition_variable>
#include <chrono>
#include <set>

namespace fnet { struct TimeTools; }
class FNET_Transport;
class FNET_ControlPacket;
class FNET_IPacketStreamer;
class FNET_IServerAdapter;

/**
 * This class represents a transport thread and handles a subset of
 * the network related work for the application in both client and
 * server aspects.
 **/
class FNET_TransportThread
{
    friend class FNET_IOComponent;

public:
    using Selector = vespalib::Selector<FNET_IOComponent>;

private:
    FNET_Transport          &_owner;          // owning transport layer
    vespalib::steady_time    _now;            // current time sampler
    FNET_Scheduler           _scheduler;      // transport thread scheduler
    FNET_IOComponent        *_componentsHead; // I/O component list head
    FNET_IOComponent        *_timeOutHead;    // first IOC in list to time out
    FNET_IOComponent        *_componentsTail; // I/O component list tail
    std::atomic<uint32_t>    _componentCnt;   // # of components
    FNET_IOComponent        *_deleteList;     // IOC delete list
    Selector                 _selector;       // I/O event generator
    FNET_PacketQueue_NoLock  _queue;          // outer event queue
    FNET_PacketQueue_NoLock  _myQueue;        // inner event queue
    std::mutex               _lock;           // protects the Q
    std::mutex               _shutdownLock;   // used for synchronization during shutdown
    std::condition_variable  _shutdownCond;   // used for synchronization during shutdown
    std::atomic<bool>        _started;        // event loop started ?
    std::atomic<bool>        _shutdown;       // should stop event loop ?
    std::atomic<bool>        _finished;       // event loop stopped ?
    std::set<FNET_IServerAdapter*> _detaching; // server adapters being detached
    bool _reject_events; // the transport thread does not want any more events

    /**
     * Add an IOComponent to the list of components. This operation is
     * performed immidiately and without locking. This method should
     * only be called in the transport thread.
     *
     * @param comp the component to add.
     **/
    void AddComponent(FNET_IOComponent *comp);


    /**
     * Remove an IOComponent from the list of components. This operation is
     * performed immidiately and without locking. This method should
     * only be called in the transport thread.
     *
     * @param comp the component to remove.
     **/
    void RemoveComponent(FNET_IOComponent *comp);


    /**
     * Update time-out information for the given I/O component. This
     * method may only be called in the transport thread. Calling this
     * method will update the timestamp on the given IOC and perform a
     * remove/add IOC operation, putting it last in the time-out queue.
     *
     * @param comp component to update time-out info for.
     **/
    void UpdateTimeOut(FNET_IOComponent *comp);


    /**
     * Add an IOComponent to the delete list. This operation is
     * performed immidiately and without locking. This method should
     * only be called in the transport thread. NOTE: the IOC must be
     * removed from the list of active components before this method may
     * be called.
     *
     * @param comp the component to add to the delete list.
     **/
    void AddDeleteComponent(FNET_IOComponent *comp);


    /**
     * Delete (call internal_subref on) all IO Components in the delete list.
     **/
    void FlushDeleteList();


    /**
     * Post an event (ControlPacket) on the transport thread event
     * queue. This is done to tell the transport thread that it needs to
     * do an operation that could not be performed in other threads due
     * to thread-safety. If the event queue is empty, invoking this
     * method will wake up the transport thread in order to reduce
     * latency. Note that when posting events that have a reference
     * counted object as parameter you need to increase the reference
     * counter to ensure that the object will not be deleted before the
     * event is handled.
     *
     * @return true if the event was accepted (false if rejected)
     * @param cpacket the event command
     * @param context the event parameter
     **/
    bool PostEvent(FNET_ControlPacket *cpacket, FNET_Context context);


    /**
     * Discard an event. This method is used to discard events that will
     * not be handled due to shutdown.
     *
     * @param cpacket the event command
     * @param context the event parameter
     **/
    void DiscardEvent(FNET_ControlPacket *cpacket, FNET_Context context);


    /**
     * Obtain a reference to the object holding the configuration for
     * this transport object.
     *
     * @return config object.
     **/
    const FNET_Config & getConfig() const;
    const fnet::TimeTools &time_tools() const;

    void handle_add_cmd(FNET_IOComponent *ioc);
    void handle_close_cmd(FNET_IOComponent *ioc);
    void handle_detach_server_adapter_init_cmd(FNET_IServerAdapter *server_adapter);
    void handle_detach_server_adapter_fini_cmd(FNET_IServerAdapter *server_adapter);

    /**
     * This method is called to initialize the transport thread event
     * loop. It is called from the FRT_Transport::Run method. If you
     * want to customize the event loop, you should do this by invoking
     * this method once, then invoke the @ref EventLoopIteration method
     * until it returns false (indicating transport shutdown).
     *
     * @return true on success, false on failure.
     **/
    bool InitEventLoop();

    void endEventLoop();
    void checkTimedoutComponents(vespalib::duration timeout);

    /**
     * Perform a single transport thread event loop iteration. This
     * method is called by the FRT_Transport::Run method. If you want to
     * customize the event loop, you should do this by invoking the @ref
     * InitEventLoop method once, then invoke this method until it
     * returns false (indicating transport shutdown).
     *
     * @return true when active, false after shutdown.
     **/
    bool EventLoopIteration();

    [[nodiscard]] bool should_shut_down() const noexcept {
        return _shutdown.load(std::memory_order_relaxed);
    }

    [[nodiscard]] bool is_finished() const noexcept {
        return _finished.load(std::memory_order_acquire);
    }

public:
    FNET_TransportThread(const FNET_TransportThread &) = delete;
    FNET_TransportThread &operator=(const FNET_TransportThread &) = delete;
    /**
     * Construct a transport object. To activate your newly created
     * transport object you need to call either the Start method to
     * spawn a new thread to handle IO, or the Main method to let the
     * current thread become the transport thread.
     *
     * @param owner owning transport layer
     **/
    explicit FNET_TransportThread(FNET_Transport &owner_in);


    /**
     * Destruct object. This should NOT be done before the transport
     * thread has completed it's work and raised the finished flag.
     **/
    ~FNET_TransportThread();


    /**
     * Obtain the owning transport layer
     *
     * @return transport layer owning this transport thread
     **/
    FNET_Transport &owner() const { return _owner; }

    /**
     * Tune the given socket handle to be used as an async transport
     * connection.
     **/
    bool tune(vespalib::SocketHandle &handle) const;

    /**
     * Add a network listener in an abstract way. The given 'spec'
     * string has the following format: 'type/where'. 'type' specifies
     * the protocol used; currently only 'tcp' is allowed, but it is
     * included for future extensions. 'where' specifies where to listen
     * in a way depending on the 'type' field; with tcp this field holds
     * a port number. Example: listen for tcp/ip connections on port
     * 8001: spec = 'tcp/8001'. If you want to enable strict binding you
     * may supply a hostname as well, like this:
     * 'tcp/mycomputer.mydomain:8001'.
     *
     * @return the connector object, or nullptr if listen failed.
     * @param spec string specifying how and where to listen.
     * @param streamer custom packet streamer.
     * @param serverAdapter object for custom channel creation.
     **/
    FNET_Connector *Listen(const char *spec, FNET_IPacketStreamer *streamer,
                           FNET_IServerAdapter *serverAdapter);


    /**
     * Connect to a target host in an abstract way. The given 'spec'
     * string has the following format: 'type/where'. 'type' specifies
     * the protocol used; currently only 'tcp' is allowed, but it is
     * included for future extensions. 'where' specifies where to
     * connect in a way depending on the type field; with tcp this field
     * holds a host name (or IP address) and a port number. Example:
     * connect to www.fast.no on port 80 (using tcp/ip): spec =
     * 'tcp/www.fast.no:80'. The newly created connection will be
     * serviced by this transport layer object.
     *
     * @return an object representing the new connection.
     * @param spec string specifying how and where to connect.
     * @param streamer custom packet streamer.
     * @param serverAdapter adapter used to support 2way channel creation.
     * @param connContext application context for the connection.
     **/
    FNET_Connection *Connect(const char *spec, FNET_IPacketStreamer *streamer,
                             FNET_IServerAdapter *serverAdapter = nullptr,
                             FNET_Context connContext = FNET_Context());


    /**
     * This method may be used to determine how many IO Components are
     * currently controlled by this transport layer object. Note that
     * locking is not used, since this information is volatile anyway.
     *
     * @return the current number of IOComponents.
     **/
    uint32_t GetNumIOComponents() const noexcept {
        return _componentCnt.load(std::memory_order_relaxed);
    }

    /**
     * Add an I/O component to the working set of this transport
     * object. Note that the actual work is performed by the transport
     * thread. This method simply posts an event on the transport thread
     * event queue. NOTE: in order to post async events regarding I/O
     * components, an extra reference to the component needs to be
     * allocated. The needRef flag indicates wether the caller already
     * has done this.
     *
     * @param comp the component you want to add.
     * @param needRef should be set to false if the caller of this
     *        method already has obtained an extra reference to the
     *        component. If this flag is true, this method will call the
     *        internal_addref method on the component.
     **/
    void Add(FNET_IOComponent *comp, bool needRef = true);

    /**
     * Calling this method enables write events for the given I/O
     * component. Note that the actual work is performed by the
     * transport thread. This method simply posts an event on the
     * transport thread event queue. NOTE: in order to post async events
     * regarding I/O components, an extra reference to the component
     * needs to be allocated. The needRef flag indicates wether the
     * caller already has done this.
     *
     * @param comp the component that wants write events.
     * @param needRef should be set to false if the caller of this
     *        method already has obtained an extra reference to the
     *        component. If this flag is true, this method will call the
     *        internal_addref method on the component.
     **/
    void EnableWrite(FNET_IOComponent *comp, bool needRef = true);


    /**
     * Signal the completion of an asyncronous handshake operation for
     * the given io component. Note that the actual work is performed
     * by the transport thread. This method simply posts an event on
     * the transport thread event queue. NOTE: in order to post async
     * events regarding I/O components, an extra reference to the
     * component needs to be allocated. The needRef flag indicates
     * wether the caller already has done this.
     *
     * @param comp the component to signal about operation completion
     * @param needRef should be set to false if the caller of this
     *        method already has obtained an extra reference to the
     *        component. If this flag is true, this method will call the
     *        internal_addref method on the component.
     **/
    void handshake_act(FNET_IOComponent *comp, bool needRef = true);


    /**
     * Close an I/O component and remove it from the working set of this
     * transport object. Note that the actual work is performed by the
     * transport thread. This method simply posts an event on the
     * transport thread event queue. NOTE: in order to post async events
     * regarding I/O components, an extra reference to the component
     * needs to be allocated. The needRef flag indicates wether the
     * caller already has done this.
     *
     * @param comp the component you want to close (and remove).
     * @param needRef should be set to false if the caller of this
     *        method already has obtained an extra reference to the
     *        component. If this flag is true, this method will call the
     *        internal_addref method on the component.
     **/
    void Close(FNET_IOComponent *comp, bool needRef = true);

    /**
     * Start the operation of detaching a server adapter from this
     * transport.
     **/
    void init_detach(FNET_IServerAdapter *server_adapter);

    /**
     * Complete the operation of detaching a server adapter from this
     * transport.
     **/
    void fini_detach(FNET_IServerAdapter *server_adapter);

    /**
     * Post an execution event on the transport event queue. The return
     * value from this method indicate whether the execution request was
     * accepted or not. If it was accepted, the transport thread will
     * execute the given executable at a later time. However, if it was
     * rejected (this method returns false), the caller of this method
     * will need to handle the fact that the executor will never be
     * executed. Also note that it is the responsibility of the caller
     * to ensure that all needed context for the executor is kept alive
     * until the time of execution. It is ok to assume that execution
     * requests will only be rejected due to transport thread shutdown.
     *
     * @return true if the execution request was accepted, false if it was rejected
     * @param exe the executable we want to execute in the transport thread
     **/
    bool execute(FNET_IExecutable *exe);


    /**
     * Synchronize with the transport thread. This method will block
     * until all events posted before this method was invoked has been
     * processed. If the transport thread has been shut down (or is in
     * the progress of being shut down) this method will instead wait
     * for the transport thread to complete, since no more commands will
     * be performed, and waiting would be forever. Invoking this method
     * from the transport thread is not a good idea.
     **/
    void sync();


    /**
     * Obtain a pointer to the transport thread scheduler. This
     * scheduler may be used to schedule tasks to be run by the
     * transport thread.
     *
     * @return transport thread scheduler.
     **/
    FNET_Scheduler *GetScheduler() { return &_scheduler; }


    /**
     * Calling this method will shut down the transport layer in a nice
     * way. Note that the actual task of shutting down is performed by
     * the transport thread. This method simply posts an event on the
     * transport thread event queue telling it to shut down.
     *
     * @param waitFinished if this flag is set, the method call will not
     *        return until the transport layer is shut down. NOTE: do
     *        not set this flag if you are calling this method from a
     *        callback from the transport layer, as it will create a
     *        deadlock.
     **/
    void ShutDown(bool waitFinished);


    /**
     * This method will make the calling thread wait until the transport
     * layer has been shut down. NOTE: do not invoke this method if you
     * are in a callback from the transport layer, as it will create a
     * deadlock. See @ref ShutDown.
     **/
    void WaitFinished();


    // selector call-back for wakeup events
    void handle_wakeup();

    // selector call-back for io-events
    void handle_event(FNET_IOComponent &ctx, bool read, bool write);


    /**
     * Start transport layer operation in a separate thread. Note that
     * the return value of this method only indicates whether the
     * spawning of the new thread went ok.
     *
     * @return thread create status.
     * @param pool threadpool that may be used to spawn a new thread.
     **/
    bool Start(vespalib::ThreadPool &pool);


    /**
     * Calling this method will give the current thread to the transport
     * layer. The method will not return until the transport layer is
     * shut down by calling the @ref ShutDown method.
     **/
    void Main();


    /**
     * This is where the transport thread lives, when started by
     * invoking one of the @ref Main or @ref Start methods.
     **/
    void run();
};