aboutsummaryrefslogtreecommitdiffstats
path: root/messagebus/src/vespa/messagebus/messagebus.h
blob: aa926a774d0b202e5541ea905414e242b5fe555b (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once

#include "destinationsession.h"
#include "iconfighandler.h"
#include "idiscardhandler.h"
#include "intermediatesession.h"
#include "messagebusparams.h"
#include "protocolset.h"
#include "sourcesession.h"
#include <vespa/messagebus/network/inetworkowner.h>
#include <vespa/messagebus/routing/routingspec.h>
#include <map>
#include <string>
#include <atomic>

namespace mbus {

class SendProxy;
class Messenger;
class Resender;
class INetwork;
class RoutingTable;
class ProtocolRepository;

/**
 * A MessageBus object combined with an INetwork implementation makes up the central part of a messagebus setup. It is
 * important that the application destructs all sessions before destructing the MessageBus object. Also, the INetwork
 * object should be destructed after the MessageBus object.
 */
class MessageBus : public IMessageHandler,
                   public IConfigHandler,
                   public IDiscardHandler,
                   public INetworkOwner,
                   public IReplyHandler
{
private:
    using RoutingTableSP = std::shared_ptr<RoutingTable>;
    INetwork                            &_network;
    std::mutex                           _lock;
    std::map<string, RoutingTableSP>     _routingTables;
    std::map<string, IMessageHandler*>   _sessions;
    std::unique_ptr<ProtocolRepository>  _protocolRepository;
    std::unique_ptr<Messenger>           _msn;
    std::unique_ptr<Resender>            _resender;
    std::atomic<uint32_t>                _maxPendingCount;
    std::atomic<uint32_t>                _maxPendingSize;
    std::atomic<uint32_t>                _pendingCount;
    std::atomic<uint32_t>                _pendingSize;

    /**
     * This method performs the common constructor tasks.
     *
     * @param params The parameters to base setup on.
     */
    void setup(const MessageBusParams &params);

    /**
     * This method handles choking input data so that message bus does not blindly accept everything. This prevents an
     * application running out-of-memory in case it fail to choke input data itself. If this method returns false, it
     * means that it should be rejected.
     *
     * @param msg The message to count.
     * @return True if the message was accepted.
     */
    bool checkPending(Message &msg);

    /**
     * Constructs and schedules a Reply containing an error to the handler of the given Message.
     *
     * @param msg     The message to reply to.
     * @param errCode The code of the error to set.
     * @param errMsg  The message of the error to set.
     */
    void deliverError(Message::UP msg, uint32_t errCode, const string &errMsg);

public:
    /**
     * Convenience constructor that proxies {@link this#MessageBus(Network, MessageBusParams)} by adding the given
     * protocols to a default {@link MessageBusParams} object.
     *
     * @param network   The network to associate with.
     * @param protocols An array of protocols to register.
     */
    MessageBus(INetwork &net, ProtocolSet protocols);

    /**
     * Constructs an instance of message bus. This requires a network object that it will associate with. This
     * assignment may not change during the lifetime of this message bus.
     *
     * @param network The network to associate with.
     * @param params  The parameters that controls this bus.
     */
    MessageBus(INetwork &net, const MessageBusParams &params);

    /**
     * Destruct. The destructor will shut down the underlying INetwork object.
     **/
    virtual ~MessageBus();

    /**
     * This is a convenience method to call {@link this#createSourceSession(SourceSessionParams)} with default
     * values for the {@link SourceSessionParams} object.
     *
     * @param handler The reply handler to receive the replies for the session.
     * @return The created session.
     */
    SourceSession::UP createSourceSession(IReplyHandler &handler);

    /**
     * This is a convenience method to call {@link this#createSourceSession(SourceSessionParams)} by first
     * assigning the reply handler to the parameter object.
     *
     * @param handler The reply handler to receive the replies for the session.
     * @param params  The parameters to control the session.
     * @return The created session.
     */
    SourceSession::UP createSourceSession(IReplyHandler &handler,
                                          const SourceSessionParams &params);

    /**
     * Creates a source session on top of this message bus.
     *
     * @param params The parameters to control the session.
     * @return The created session.
     */
    SourceSession::UP createSourceSession(const SourceSessionParams &params);

    /**
     * This is a convenience method to call {@link this#createIntermediateSession(IntermediateSessionParams)} with
     * default values for the {@link IntermediateSessionParams} object.
     *
     * @param name          The local unique name for the created session.
     * @param broadcastName Whether or not to broadcast this session's name on the network.
     * @param msgHandler    The handler to receive the messages for the session.
     * @param replyHandler  The handler to received the replies for the session.
     * @return The created session.
     */
    IntermediateSession::UP createIntermediateSession(const string &name,
                                                      bool broadcastName,
                                                      IMessageHandler &msgHandler,
                                                      IReplyHandler &replyHandler);

    /**
     * Creates an intermediate session on top of this message bus using the given handlers and parameter object.
     *
     * @param params The parameters to control the session.
     * @return The created session.
     */
    IntermediateSession::UP createIntermediateSession(const IntermediateSessionParams &params);

    /**
     * This is a convenience method to call {@link this#createDestinationSession(DestinationSessionParams)} with default
     * values for the {@link DestinationSessionParams} object.
     *
     * @param name          The local unique name for the created session.
     * @param broadcastName Whether or not to broadcast this session's name on the network.
     * @param handler       The handler to receive the messages for the session.
     * @return The created session.
     */
    DestinationSession::UP createDestinationSession(const string &name,
                                                    bool broadcastName,
                                                    IMessageHandler &handler);

    /**
     * Creates a destination session on top of this message bus using the given handlers and parameter object.
     *
     * @param params The parameters to control the session.
     * @return The created session.
     */
    DestinationSession::UP createDestinationSession(const DestinationSessionParams &params);

    /**
     * Register a session; used by session instances that are created with deferred registration.
     * Don't use this directly.
     */
    void register_session(IMessageHandler& session, const string& session_name, bool broadcast_name);

    /**
     * Unregister a session. This method is invoked by session destructors to ensure that no more Message objects are
     * delivered and that the session name is removed from the network naming service. The sync method can be invoked
     * after invoking this one to ensure that no callbacks are active.
     *
     * @param sessionName name of the session to unregister
     **/
    void unregisterSession(const string &sessionName);

    /**
     * Obtain the routing table for the given protocol. If the appropriate routing table could not be found, a shared
     * pointer to 0 is returned.
     *
     * @return shared pointer to routing table
     * @param protocol the protocol name
     **/
    RoutingTableSP getRoutingTable(const string &protocol);

    /**
     * Returns a routing policy that corresponds to the argument protocol name, policy name and policy parameter. This
     * will cache reuse all policies as soon as they are first requested.
     *
     * @param protocol    The name of the protocol to invoke {@link Protocol#createPolicy(String,String)} on.
     * @param policyName  The name of the routing policy to retrieve.
     * @param policyParam The parameter for the routing policy to retrieve.
     * @return A corresponding routing policy, or null.
     */
    IRoutingPolicy::SP getRoutingPolicy(const string &protocol, const string &policyName,
                                        const string &policyParam);

    /**
     * Synchronize with internal threads. This method will handshake with all internal threads. This has the implicit
     * effect of waiting for all active callbacks. Note that this method should never be invoked from a callback since
     * that would make the thread wait for itself... forever. This method is typically used to untangle during session
     * destruction.
     **/
    void sync();

    /**
     * Returns the resender that is running within this message bus.
     *
     * @return The resender.
     */
    Resender *getResender() { return _resender.get(); }

    /**
     * Returns the number of messages received that have not been replied to yet.
     *
     * @return The pending count.
     */
    uint32_t getPendingCount() const { return _pendingCount; }

    /**
     * Returns the size of messages received that have not been replied to yet.
     *
     * @return The pending size.
     */
    uint32_t getPendingSize() const { return _pendingSize; }

    /**
     * Sets the maximum number of messages that can be received without being replied to yet.
     *
     * @param maxCount The max count.
     */
    void setMaxPendingCount(uint32_t maxCount);

    /**
     * Gets maximum number of messages that can be received without being
     * replied to yet.
     */
    uint32_t getMaxPendingCount() const noexcept {
        return _maxPendingCount.load(std::memory_order_relaxed);
    }

    /**
     * Sets the maximum size of messages that can be received without being replied to yet.
     *
     * @param maxSize The max size.
     */
    void setMaxPendingSize(uint32_t maxSize);

    /**
     * Gets maximum combined size of messages that can be received without
     * being replied to yet.
     */
    uint32_t getMaxPendingSize() const noexcept {
        return _maxPendingSize.load(std::memory_order_relaxed);
    }

    /**
     * Adds a protocol to the internal repository of protocols, replacing any previous instance of the
     * protocol and clearing the associated routing policy cache.
     *
     * @param protocol The protocol to add.
     */
    IProtocol::SP putProtocol(const IProtocol::SP & protocol);

    /**
     * Returns the connection spec string for the network layer of this message bus. This is merely a proxy of
     * the same function in the network layer.
     *
     * @return The connection string.
     */
    string getConnectionSpec() const;

    /**
     * Provide access to the underlying {@link Messenger} object.
     *
     * @return The underlying {@link Messenger} object.
     */
    Messenger & getMessenger() { return *_msn; }

    // Implements IReplyHandler.
    void handleReply(Reply::UP reply) override;

    // Implements IDiscardHandler.
    void handleDiscard(Context ctx) override;

    // Implements IMessageHandler.
    void handleMessage(Message::UP msg) override;

    // Implements IConfigHandler.
    bool setupRouting(RoutingSpec spec) override;

    // Implements INetworkOwner.
    IProtocol * getProtocol(const string &name) override;

    // Implements INetworkOwner.
    void deliverMessage(Message::UP msg, const string &session) override;

    // Implements INetworkOwner.
    void deliverReply(Reply::UP reply, IReplyHandler &handler) override;
};

} // namespace mbus