blob: b395639ec2d686e346b445b21bc577cd824b7898 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
|
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
#include "iretrypolicy.h"
#include <vespa/messagebus/queue.h>
#include <vespa/messagebus/reply.h>
#include <mutex>
#include <queue>
#include <vector>
namespace mbus {
class RoutingNode;
/**
* The resender handles scheduling and execution of sending instances of {@link
* RoutingNode}. An instance of this class is owned by {@link
* com.yahoo.messagebus.MessageBus}. Because this class does not have any
* internal thread, it depends on message bus to keep polling it whenever it has
* time.
*/
class Resender
{
private:
using time_point = std::chrono::steady_clock::time_point;
using Entry = std::pair<time_point , RoutingNode*>;
struct Cmp {
bool operator()(const Entry &a, const Entry &b) {
return (b.first < a.first);
}
};
using PriorityQueue = std::priority_queue<Entry, std::vector<Entry>, Cmp>;
std::mutex _queue_mutex;
PriorityQueue _queue;
IRetryPolicy::SP _retryPolicy;
public:
/**
* Convenience typedefs.
*/
using UP = std::unique_ptr<Resender>;
Resender(const Resender &) = delete;
Resender & operator = (const Resender &) = delete;
/**
* Constructs a new resender.
*
* @param retryPolicy The retry policy to use.
*/
explicit Resender(IRetryPolicy::SP retryPolicy);
/**
* Empties the retry queue.
*/
~Resender();
/**
* Returns whether or not the current {@link RetryPolicy} supports resending
* a {@link Reply} that contains an error with the given error code.
*
* @param errorCode The code to check.
* @return True if the message can be resent.
*/
[[nodiscard]] bool canRetry(uint32_t errorCode) const;
/**
* Returns whether or not the given reply should be retried.
*
* @param reply The reply to check.
* @return True if retry is required.
*/
[[nodiscard]] bool shouldRetry(const Reply &reply) const;
/**
* Schedules the given node for resending, if enabled by message. This will
* invoke {@link RoutingNode#prepareForRetry()} if the node was queued. This
* method is NOT thread-safe, and should only be called by the messenger
* thread.
*
* @param node The node to resend.
* @return True if the node was queued.
*/
[[nodiscard]] bool scheduleRetry(RoutingNode &node);
/**
* Invokes {@link RoutingNode#send()} on all routing nodes that are
* applicable for sending at the current time.
*/
void resendScheduled();
};
} // namespace mbus
|