aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/vespa/storage/visiting/visitormanager.h
blob: fefa2c218ab2d63f008e8b2af073b7a403380a38 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
/**
 * @class storage::VisitorManager
 * @ingroup storageserver
 *
 * @brief Storage module for handling visitors.
 *
 * This module will dispatch iterator commands to the persistence layer, and
 * feed the results to the correct Visitor modules.  As long as there are
 * active visitors, an iterator is running on the persistence layer. New
 * visitors hook into this stream and remember their starting position. The
 * iterator will loop round the database and visitors receive EOF when they are
 * back at their starting position
 *
 * @author Fledsbo
 * @date 2004-3-30
 * @version $Id$
 */

#pragma once

#include "commandqueue.h"
#include "visitor.h"
#include "visitormetrics.h"
#include "visitorthread.h"
#include <vespa/storage/visiting/config-stor-visitor.h>
#include <vespa/storage/common/storagelink.h>
#include <vespa/storageframework/generic/status/htmlstatusreporter.h>
#include <vespa/storageapi/message/datagram.h>
#include <vespa/storageapi/message/internal.h>
#include <vespa/storageapi/message/visitor.h>
#include <vespa/config/helper/ifetchercallback.h>

namespace config {
    class ConfigUri;
    class ConfigFetcher;
}

namespace storage {

class RequestStatusPageReply;

class VisitorManager : public framework::Runnable,
                       public StorageLink,
                       public framework::HtmlStatusReporter,
                       private VisitorMessageHandler,
                       private framework::MetricUpdateHook
{
private:
    using StorVisitorConfig = vespa::config::content::core::StorVisitorConfig;

    StorageComponentRegister& _componentRegister;
    VisitorMessageSessionFactory& _messageSessionFactory;
    std::vector<std::pair<std::shared_ptr<VisitorThread>,
                          std::map<api::VisitorId, std::string>> > _visitorThread;

    struct MessageInfo {
        api::VisitorId id;
        vespalib::system_time timestamp;
        vespalib::duration timeout;
        std::string destination;
    };

    std::map<api::StorageMessage::Id, MessageInfo> _visitorMessages;
    mutable std::mutex      _visitorLock;
    std::condition_variable _visitorCond;
    uint64_t _visitorCounter;
    std::shared_ptr<VisitorMetrics> _metrics;
    uint32_t _maxFixedConcurrentVisitors;
    uint32_t _maxVariableConcurrentVisitors;
    uint32_t _maxVisitorQueueSize;
    std::map<std::string, api::VisitorId> _nameToId;
    StorageComponent _component;
    std::unique_ptr<framework::Thread> _thread;
    CommandQueue<api::CreateVisitorCommand> _visitorQueue;
    std::deque<std::pair<std::string, vespalib::steady_time> > _recentlyDeletedVisitors;
    vespalib::duration _recentlyDeletedMaxTime;

    mutable std::mutex _statusLock; // Only one can get status at a time
    mutable std::condition_variable _statusCond;// Notify when done
    mutable std::vector<std::shared_ptr<RequestStatusPageReply> > _statusRequest;
    bool _enforceQueueUse;
    VisitorFactory::Map _visitorFactories;
public:
    VisitorManager(const StorVisitorConfig& bootstrap_config,
                   StorageComponentRegister&,
                   VisitorMessageSessionFactory&,
                   VisitorFactory::Map external = VisitorFactory::Map(),
                   bool defer_manager_thread_start = false);
    ~VisitorManager() override;

    void onClose() override;
    void print(std::ostream& out, bool verbose, const std::string& indent) const override;
    uint32_t getActiveVisitorCount() const;
    void setTimeBetweenTicks(uint32_t time);

    void on_configure(const vespa::config::content::core::StorVisitorConfig&);

    void setMaxConcurrentVisitors(uint32_t count) { // Used in unit testing
        _maxFixedConcurrentVisitors = count;
        _maxVariableConcurrentVisitors = 0;
    }

    // Used in unit testing
    void setMaxConcurrentVisitors(uint32_t fixed, uint32_t variable) {
        _maxFixedConcurrentVisitors = fixed;
        _maxVariableConcurrentVisitors = variable;
    }

    void setMaxVisitorQueueSize(uint32_t count) { // Used in unit testing
        _maxVisitorQueueSize = count;
    }

    /** For unit testing */
    VisitorThread& getThread(uint32_t index) {
        return *_visitorThread[index].first;
    }
    /** For unit testing */
    bool hasPendingMessageState() const;
    // Must be called exactly once iff manager was created with defer_manager_thread_start == true
    void create_and_start_manager_thread();

    void enforceQueueUsage() { _enforceQueueUse = true; }

private:
    using MonitorGuard = std::unique_lock<std::mutex>;
    void run(framework::ThreadHandle&) override;

    /**
     * Schedules a visitor for running. onCreateVisitor will typically call
     * this with skipQueue = false, and closed(id) will typically call it with
     * skipQueue = true to schedule next visitor in queue.
     *
     * @return True if successful, false if failed and reply is sent.
     */
    bool scheduleVisitor(const std::shared_ptr<api::CreateVisitorCommand>&,
                         bool skipQueue, MonitorGuard & visitorLock);

    bool onCreateVisitor(const std::shared_ptr<api::CreateVisitorCommand>&) override;

    bool onDown(const std::shared_ptr<api::StorageMessage>& r) override;
    bool onInternalReply(const std::shared_ptr<api::InternalReply>& r) override;
    bool processReply(const std::shared_ptr<api::StorageReply>&);

    /**
     * Internal function that is used for scheduling the highest
     * priority visitor--if any--for running. Called automatically
     * by closed(id). visitorLock must be held at the time of the call,
     * and will in the case of a successful scheduling be unlocked, as
     * scheduleVisitor() is called internally. If more* visitors are
     * to be attempted scheduled, the lock must first be re-acquired.
     *
     * @return true if a visitor was removed from the queue and scheduled,
     * false otherwise.
     */
    bool attemptScheduleQueuedVisitor(MonitorGuard& visitorLock);

    // VisitorMessageHandler implementation
    void send(const std::shared_ptr<api::StorageCommand>& cmd, Visitor& visitor) override;
    void send(const std::shared_ptr<api::StorageReply>& reply) override;
    void closed(api::VisitorId id) override;

    void reportHtmlStatus(std::ostream&, const framework::HttpUrlPath&) const override;

    /**
     * The maximum amount of concurrent visitors for a priority is given
     * by the formula: fixed + variable * ((255 - priority) / 255)
     */
    uint32_t maximumConcurrent(const api::CreateVisitorCommand& cmd) const {
        return _maxFixedConcurrentVisitors + static_cast<uint32_t>(_maxVariableConcurrentVisitors * ((255.0 - cmd.getPriority()) / 255.0));
    }

    void updateMetrics(const MetricLockGuard &) override;
};

}