aboutsummaryrefslogtreecommitdiffstats
path: root/messagebus/src/vespa/messagebus/sourcesession.h
blob: 03628f06c56b1ea1bd7da60bcc7019fde5688aef (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once

#include "ireplyhandler.h"
#include "result.h"
#include "sequencer.h"
#include "sourcesessionparams.h"
#include "replygate.h"
#include <atomic>
#include <condition_variable>

namespace mbus {

class MessageBus;

/**
 * A SourceSession is used to send Message objects along a named or explicitly defined route and get Reply
 * objects back. A source session does not have a service name and can only receive replies to the messages
 * sent on it.
 **/
class SourceSession : public IReplyHandler {
private:
    friend class MessageBus;
    template <typename T> using ref_counted = vespalib::ref_counted<T>;

    std::mutex              _lock;
    std::condition_variable _cond;
    MessageBus             &_mbus;
    ref_counted<ReplyGate>  _gate;
    Sequencer               _sequencer;
    IReplyHandler          &_replyHandler;
    IThrottlePolicy::SP     _throttlePolicy;
    duration                _timeout;
    std::atomic<uint32_t>   _pendingCount;
    bool                    _closed;
    bool                    _done;

private:
    /**
     * This is the private constructor used by mbus to create source sessions. It expects all arguments but
     * the {@link SourceSessionParams} to be proper, so no checks are performed.
     *
     * @param mbus   The message bus that created this instance.
     * @param params A parameter object that holds configuration parameters.
     */
    SourceSession(MessageBus &mbus, const SourceSessionParams &params);

public:
    /**
     * Convenience typedef for an auto pointer to a SourceSession object.
     **/
    using UP = std::unique_ptr<SourceSession>;

    /**
     * The destructor untangles from messagebus. This is safe, but you will loose the replies of all pending
     * messages. After this method returns, messagebus will not invoke any handlers associated with this
     * session.
     **/
    ~SourceSession() override;

    /**
     * This is a convenience function to assign a named route to the given message, and then pass it to the
     * other {@link #send(Message)} method of this session. If the route could not be found this methods
     * returns with an appropriate error, unless the 'parseIfNotFound' argument is true. In that case, the
     * route name is passed through to the Route factory method {@link Route#create}.
     *
     * @param msg             The message to send.
     * @param routeName       The route to assign to the message.
     * @param parseIfNotFound Whether or not to parse routeName as a route if it could not be found.
     * @return The immediate result of the attempt to send this message.
     */
    Result send(Message::UP msg, const string &routeName, bool parseIfNotFound = false);

    /**
     * This is a convenience function to assign a given route to the given message, and then pass it to the
     * other {@link #send(Message)} method of this session.
     *
     * @param msg   The message to send.
     * @param route The route to assign to the message.
     * @return The immediate result of the attempt to send this message.
     */
    Result send(Message::UP msg, const Route &route);

    /**
     * Send a Message along a route that has already been specified in the message object.
     *
     * @return send result
     * @param msg the message to send
     */
    Result send(Message::UP msg);

    /**
     * Handle a Reply obtained from messagebus.
     *
     * @param reply the Reply
     **/
    void handleReply(Reply::UP reply) override;

    /**
     * Close this session. This method will block until Reply objects have been obtained for all pending
     * Message objects. Also, no more Message objects will be accepted by this session after closing has
     * initiated.
     **/
    void close();

    /**
     * Returns the reply handler of this session.
     *
     * @return The reply handler.
     */
    IReplyHandler &getReplyHandler() { return _replyHandler; }

    /**
     * Returns the number of messages sent that have not been replied to yet.
     *
     * @return The pending count.
     */
    [[nodiscard]] uint32_t getPendingCount() const noexcept {
        return _pendingCount.load(std::memory_order_relaxed);
    }

    /**
     * Sets the number of seconds a message can be attempted sent until it times out.
     *
     * @param timeout The numer of seconds allowed.
     * @return This, to allow chaining.
     */
    SourceSession &setTimeout(duration timeout);
};

} // namespace mbus