blob: b667b0f86dab13fd2a457edef1575fd9da228ea4 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
|
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "destinationsession.h"
#include "messagebus.h"
#include "emptyreply.h"
#include <cassert>
namespace mbus {
DestinationSession::DestinationSession(MessageBus &mbus, const DestinationSessionParams ¶ms) :
_mbus(mbus),
_name(params.getName()),
_msgHandler(params.getMessageHandler()),
_session_registered(!params.defer_registration()),
_broadcast_name(params.getBroadcastName())
{ }
DestinationSession::~DestinationSession() {
close();
}
void
DestinationSession::register_session_deferred() {
assert(!_session_registered);
_mbus.register_session(*this, _name, _broadcast_name);
_session_registered = true;
}
void
DestinationSession::close() {
if (_session_registered) {
_mbus.unregisterSession(_name);
_mbus.sync();
_session_registered = false;
}
}
void
DestinationSession::acknowledge(Message::UP msg) {
Reply::UP ack(new EmptyReply());
ack->swapState(*msg);
reply(std::move(ack));
}
void
DestinationSession::reply(Reply::UP ret) {
IReplyHandler &handler = ret->getCallStack().pop(*ret);
handler.handleReply(std::move(ret));
}
void
DestinationSession::handleMessage(Message::UP msg) {
_msgHandler.handleMessage(std::move(msg));
}
const string
DestinationSession::getConnectionSpec() const {
return _mbus.getConnectionSpec() + "/" + _name;
}
} // namespace mbus
|