blob: 021e58983c158dc29409c752fd7885287bbd5a6e (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
|
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
#include "reply.h"
#include "imessagehandler.h"
#include "intermediatesessionparams.h"
#include "replygate.h"
namespace mbus {
class MessageBus;
class Message;
/**
* An IntermediateSession is used to process Message and Reply objects
* on the way along a route.
**/
class IntermediateSession : public IMessageHandler,
public IReplyHandler
{
private:
friend class MessageBus;
using MessageUP = std::unique_ptr<Message>;
template <typename T> using ref_counted = vespalib::ref_counted<T>;
MessageBus &_mbus;
string _name;
IMessageHandler &_msgHandler;
IReplyHandler &_replyHandler;
ref_counted<ReplyGate> _gate;
/**
* This constructor is declared package private since only MessageBus is supposed to instantiate it.
*
* @param mbus The message bus that created this instance.
* @param params The parameter object for this session.
*/
IntermediateSession(MessageBus &mbus, const IntermediateSessionParams ¶ms);
public:
/**
* Convenience typedefs.
*/
using UP = std::unique_ptr<IntermediateSession>;
/**
* The destructor untangles from messagebus. After this method returns, messagebus will not invoke any
* handlers associated with this session.
*/
virtual ~IntermediateSession();
/**
* This method unregisters this session from message bus, effectively disabling any more messages from
* being delivered to the message handler. After unregistering, this method calls {@link
* com.yahoo.messagebus.MessageBus#sync()} as to ensure that there are no threads currently entangled in
* the handler.
*
* This method will deadlock if you call it from the message or reply handler.
*/
void close();
/**
* Forwards a routable to the next hop in its route. This method will never block.
*
* @param routable The routable to forward.
*/
void forward(Routable::UP routable);
/**
* Convenience method to call {@link #forward(Routable)}.
*
* @param msg The message to forward.
*/
void forward(MessageUP msg);
/**
* Convenience method to call {@link #forward(Routable)}.
*
* @param reply The reply to forward.
*/
void forward(Reply::UP reply);
/**
* Returns the connection spec string for this session. This returns a combination of the owning message
* bus' own spec string and the name of this session.
*
* @return The connection string.
*/
const string getConnectionSpec() const;
void handleMessage(MessageUP message) override;
void handleReply(Reply::UP reply) override;
};
} // namespace mbus
|