1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
|
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
#include "sentmessagemap.h"
#include "distributormessagesender.h"
#include "operationstarter.h"
namespace storage::framework { struct Clock; }
namespace storage::distributor {
class CancelScope;
class Operation;
/**
Storage link that keeps track of running operations.
*/
class OperationOwner : public OperationStarter {
public:
class Sender : public DistributorStripeMessageSender {
public:
Sender(OperationOwner& owner,
DistributorStripeMessageSender& sender,
const std::shared_ptr<Operation>& cb)
: _owner(owner),
_sender(sender),
_cb(cb)
{}
void sendCommand(const std::shared_ptr<api::StorageCommand> &) override;
void sendReply(const std::shared_ptr<api::StorageReply> & msg) override;
OperationOwner& getOwner() {
return _owner;
}
int getDistributorIndex() const override {
return _sender.getDistributorIndex();
}
const ClusterContext & cluster_context() const override {
return _sender.cluster_context();
}
PendingMessageTracker& getPendingMessageTracker() override {
return _sender.getPendingMessageTracker();
}
const PendingMessageTracker& getPendingMessageTracker() const override {
return _sender.getPendingMessageTracker();
}
const OperationSequencer& operation_sequencer() const noexcept override {
return _sender.operation_sequencer();
}
OperationSequencer& operation_sequencer() noexcept override {
return _sender.operation_sequencer();
}
private:
OperationOwner& _owner;
DistributorStripeMessageSender& _sender;
std::shared_ptr<Operation> _cb;
};
OperationOwner(DistributorStripeMessageSender& sender,
const framework::Clock& clock)
: _sender(sender),
_clock(clock) {
}
~OperationOwner() override;
/**
Handles replies from storage, mapping from a message id to an operation.
If the operation was found, returns it in result.first. If the operation was
done after the reply was processed (no more pending commands), returns true
*/
bool handleReply(const std::shared_ptr<api::StorageReply>& reply);
SentMessageMap& getSentMessageMap() {
return _sentMessageMap;
};
bool start(const std::shared_ptr<Operation>& operation, Priority priority) override;
/**
* If the given message exists, remove it from the internal operation mapping.
*
* Returns the operation the message belonged to, if any.
*/
[[nodiscard]] std::shared_ptr<Operation> erase(api::StorageMessage::Id msgId);
/**
* Returns a strong ref to the pending operation with the given msg_id if it exists.
* Otherwise returns an empty shared_ptr.
*/
[[nodiscard]] std::shared_ptr<Operation> find_by_id(api::StorageMessage::Id msg_id) const noexcept;
[[nodiscard]] bool try_cancel_by_id(api::StorageMessage::Id msg_id, const CancelScope& cancel_scope);
[[nodiscard]] DistributorStripeMessageSender& sender() noexcept { return _sender; }
void onClose();
uint32_t size() const { return _sentMessageMap.size(); }
std::string toString() const;
private:
SentMessageMap _sentMessageMap;
DistributorStripeMessageSender& _sender;
const framework::Clock& _clock;
};
}
|