aboutsummaryrefslogtreecommitdiffstats
path: root/messagebus/src/vespa/messagebus/routing/resender.h
blob: b395639ec2d686e346b445b21bc577cd824b7898 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once

#include "iretrypolicy.h"
#include <vespa/messagebus/queue.h>
#include <vespa/messagebus/reply.h>
#include <mutex>
#include <queue>
#include <vector>

namespace mbus {

class RoutingNode;

/**
 * The resender handles scheduling and execution of sending instances of {@link
 * RoutingNode}. An instance of this class is owned by {@link
 * com.yahoo.messagebus.MessageBus}. Because this class does not have any
 * internal thread, it depends on message bus to keep polling it whenever it has
 * time.
 */
class Resender
{
private:
    using time_point = std::chrono::steady_clock::time_point;
    using Entry = std::pair<time_point , RoutingNode*>;
    struct Cmp {
        bool operator()(const Entry &a, const Entry &b) {
            return (b.first < a.first);
        }
    };
    using PriorityQueue = std::priority_queue<Entry, std::vector<Entry>, Cmp>;

    std::mutex       _queue_mutex;
    PriorityQueue    _queue;
    IRetryPolicy::SP _retryPolicy;
public:
    /**
     * Convenience typedefs.
     */
    using UP = std::unique_ptr<Resender>;
    Resender(const Resender &) = delete;
    Resender & operator = (const Resender &) = delete;

    /**
     * Constructs a new resender.
     *
     * @param retryPolicy The retry policy to use.
     */
    explicit Resender(IRetryPolicy::SP retryPolicy);

    /**
     * Empties the retry queue.
     */
    ~Resender();

    /**
     * Returns whether or not the current {@link RetryPolicy} supports resending
     * a {@link Reply} that contains an error with the given error code.
     *
     * @param errorCode The code to check.
     * @return True if the message can be resent.
     */
    [[nodiscard]] bool canRetry(uint32_t errorCode) const;

    /**
     * Returns whether or not the given reply should be retried.
     *
     * @param reply The reply to check.
     * @return True if retry is required.
     */
    [[nodiscard]] bool shouldRetry(const Reply &reply) const;

    /**
     * Schedules the given node for resending, if enabled by message. This will
     * invoke {@link RoutingNode#prepareForRetry()} if the node was queued. This
     * method is NOT thread-safe, and should only be called by the messenger
     * thread.
     *
     * @param node The node to resend.
     * @return True if the node was queued.
     */
    [[nodiscard]] bool scheduleRetry(RoutingNode &node);

    /**
     * Invokes {@link RoutingNode#send()} on all routing nodes that are
     * applicable for sending at the current time.
     */
    void resendScheduled();
};

} // namespace mbus