aboutsummaryrefslogtreecommitdiffstats
path: root/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h
blob: 0433ca205b697ef8c38f31d4db5624b03f3a3af5 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once

#include "map_listener.h"
#include "map_source.h"
#include "mapping_monitor.h"
#include "named_service.h"
#include "proxy_map_source.h"
#include "request_completion_handler.h"
#include "service_map_history.h"
#include "service_mapping.h"

#include <vespa/fnet/task.h>

#include <vector>
#include <memory>
#include <map>

namespace slobrok {

/**
 * @class LocalRpcMonitorMap
 * @brief A collection of ManagedRpcServer objects
 *
 * Tracks up/down status for name->spec combinations
 * that are considered for publication locally.
 **/
class LocalRpcMonitorMap : public MapListener,
                           public MappingMonitorOwner
{
private:
    enum class EventType { ADD, REMOVE };

    struct Event {
        EventType type;
        ServiceMapping mapping;
        static Event add(const ServiceMapping &value) {
            return Event{EventType::ADD, value};
        }
        static Event remove(const ServiceMapping &value) {
            return Event{EventType::REMOVE, value};
        }
    };

    class DelayedTasks : public FNET_Task {
        std::vector<Event>  _queue;
        LocalRpcMonitorMap &_target;
    public:
        void handleLater(Event event) {
            _queue.emplace_back(std::move(event));
            ScheduleNow();
        }

        void PerformTask() override;

        DelayedTasks(FNET_Scheduler *scheduler, LocalRpcMonitorMap &target)
          : FNET_Task(scheduler),
            _queue(),
            _target(target)
        {}

        ~DelayedTasks() override { Kill(); }
    };

    DelayedTasks _delayedTasks;

    struct PerService {
        bool up;
        bool localOnly;
        std::unique_ptr<CompletionHandler> inflight;
        vespalib::string spec;
        PerService(bool up_in, bool local_only, std::unique_ptr<CompletionHandler> inflight_in, vespalib::stringref spec_in)
            : up(up_in), localOnly(local_only), inflight(std::move(inflight_in)), spec(spec_in)
        {}
        PerService(const PerService &) = delete;
        PerService & operator=(const PerService &) = delete;
        PerService(PerService &&) noexcept;
        PerService & operator =(PerService &&) noexcept;
        ~PerService();
    };

    static PerService localService(const ServiceMapping &mapping,
                            std::unique_ptr<CompletionHandler> inflight)
    {
        return {false, true, std::move(inflight), mapping.spec};
    }

    static PerService globalService(const ServiceMapping &mapping) {
        return {false, false, {}, mapping.spec};
    }

    using Map = std::map<vespalib::string, PerService>;

    Map _map;
    ProxyMapSource _dispatcher;
    ServiceMapHistory _history;
    MappingMonitor::UP _mappingMonitor;
    std::unique_ptr<MapSubscription> _subscription;

    void doAdd(const ServiceMapping &mapping);
    void doRemove(const ServiceMapping &mapping);

    PerService & lookup(const ServiceMapping &mapping);

    void addToMap(const ServiceMapping &mapping, PerService psd, bool hurry);

    struct RemovedData {
        ServiceMapping mapping;
        bool up;
        bool localOnly;
        std::unique_ptr<CompletionHandler> inflight;
        ~RemovedData();
    };

    RemovedData removeFromMap(Map::iterator iter);

public:
    LocalRpcMonitorMap(FNET_Scheduler *scheduler,
                       MappingMonitorFactory mappingMonitorFactory);
    ~LocalRpcMonitorMap() override;

    MapSource &dispatcher() { return _dispatcher; }
    ServiceMapHistory & history();

    [[nodiscard]] bool wouldConflict(const ServiceMapping &mapping) const;

    /** for use by register API, will call doneHandler() on inflight script */
    void addLocal(const ServiceMapping &mapping,
                  std::unique_ptr<CompletionHandler> inflight);

    /** for use by unregister API */
    void removeLocal(const ServiceMapping &mapping);

    void add(const ServiceMapping &mapping) override;
    void remove(const ServiceMapping &mapping) override;

    void up(const ServiceMapping& mapping) override;
    void down(const ServiceMapping& mapping) override;
};

//-----------------------------------------------------------------------------

} // namespace slobrok