aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/vespa/storageapi/message/visitor.h
blob: fddb7604eff27f8dd2ed5f1debc4b6171d3d6f4a (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
/**
 * @file storageapi/message/visitor.h
 *
 * Messages related to visitors, used by the visitor manager.
 */

#pragma once

#include <vespa/storageapi/defs.h>
#include <vespa/document/bucket/bucketid.h>
#include <vespa/vdslib/container/parameters.h>
#include <vespa/vdslib/container/visitorstatistics.h>
#include <vespa/storageapi/messageapi/storagecommand.h>
#include <vespa/storageapi/messageapi/storagereply.h>

namespace storage::api {

/**
 * @class CreateVisitorCommand
 * @ingroup message
 *
 * @brief Command for creating a visitor.
 */
class CreateVisitorCommand : public StorageCommand {
private:
    document::BucketSpace _bucketSpace;
    vespalib::string _libName; // Name of visitor library to use, ie. DumpVisitor.so
    vdslib::Parameters _params;

    vespalib::string _controlDestination;
    vespalib::string _dataDestination;

    vespalib::string _docSelection;
    std::vector<document::BucketId> _buckets;
    Timestamp _fromTime;
    Timestamp _toTime;

    uint32_t _visitorCmdId;
    vespalib::string _instanceId;
    VisitorId _visitorId; // Set on storage node

    bool _visitRemoves;
    vespalib::string _fieldSet;
    bool _visitInconsistentBuckets;

    duration _queueTimeout;
    uint32_t _maxPendingReplyCount;
    uint32_t _version;

    uint32_t _maxBucketsPerVisitor;

public:
    CreateVisitorCommand(document::BucketSpace bucketSpace,
                         vespalib::stringref libraryName,
                         vespalib::stringref instanceId,
                         vespalib::stringref docSelection);

    /** Create another command with similar visitor settings. */
    CreateVisitorCommand(const CreateVisitorCommand& template_);
    ~CreateVisitorCommand();

    void setVisitorCmdId(uint32_t id) { _visitorCmdId = id; }
    void setControlDestination(vespalib::stringref d) { _controlDestination = d; }
    void setDataDestination(vespalib::stringref d) { _dataDestination = d; }
    void setParameters(const vdslib::Parameters& params) { _params = params; }
    void setMaximumPendingReplyCount(uint32_t count) { _maxPendingReplyCount = count; }
    void setFieldSet(vespalib::stringref fieldSet) { _fieldSet = fieldSet; }
    void setVisitRemoves(bool value = true) { _visitRemoves = value; }
    void setVisitInconsistentBuckets(bool visitInconsistent = true) { _visitInconsistentBuckets = visitInconsistent; }
    void addBucketToBeVisited(const document::BucketId& id) { _buckets.push_back(id); }
    void setVisitorId(const VisitorId id) { _visitorId = id; }
    void setInstanceId(vespalib::stringref id) { _instanceId = id; }
    void setQueueTimeout(duration milliSecs) { _queueTimeout = milliSecs; }
    void setFromTime(Timestamp ts) { _fromTime = ts; }
    void setToTime(Timestamp ts) { _toTime = ts; }

    VisitorId getVisitorId() const { return _visitorId; }
    uint32_t getVisitorCmdId() const { return _visitorCmdId; }
    document::BucketSpace getBucketSpace() const { return _bucketSpace; }
    document::Bucket getBucket() const override;
    document::BucketId super_bucket_id() const;
    const vespalib::string & getLibraryName() const { return _libName; }
    const vespalib::string & getInstanceId() const { return _instanceId; }
    const vespalib::string & getControlDestination() const { return _controlDestination; }
    const vespalib::string & getDataDestination() const { return _dataDestination; }
    const vespalib::string & getDocumentSelection() const { return _docSelection; }
    const vdslib::Parameters& getParameters() const { return _params; }
    vdslib::Parameters& getParameters() { return _params; }
    uint32_t getMaximumPendingReplyCount() const { return _maxPendingReplyCount; }
    const std::vector<document::BucketId>& getBuckets() const { return _buckets; }
    Timestamp getFromTime() const { return _fromTime; }
    Timestamp getToTime() const { return _toTime; }
    std::vector<document::BucketId>& getBuckets() { return _buckets; }
    bool visitRemoves() const { return _visitRemoves; }
    const vespalib::string& getFieldSet() const { return _fieldSet; }
    bool visitInconsistentBuckets() const { return _visitInconsistentBuckets; }
    duration getQueueTimeout() const { return _queueTimeout; }

    void setVisitorDispatcherVersion(uint32_t version) { _version = version; }
    uint32_t getVisitorDispatcherVersion() const { return _version; }

    void setMaxBucketsPerVisitor(uint32_t max) { _maxBucketsPerVisitor = max; }
    uint32_t getMaxBucketsPerVisitor() const { return _maxBucketsPerVisitor; }

    void print(std::ostream& out, bool verbose, const std::string& indent) const override;
    DECLARE_STORAGECOMMAND(CreateVisitorCommand, onCreateVisitor)
};

/**
 * @class CreateVisitorReply
 * @ingroup message
 *
 * @brief Response to a create visitor command.
 */
class CreateVisitorReply : public StorageReply {
private:
    document::BucketId _super_bucket_id;
    document::BucketId _lastBucket;
    vdslib::VisitorStatistics _visitorStatistics;

public:
    explicit CreateVisitorReply(const CreateVisitorCommand& cmd);

    void print(std::ostream& out, bool verbose, const std::string& indent) const override;

    void setLastBucket(const document::BucketId& lastBucket) { _lastBucket = lastBucket; }

    const document::BucketId& super_bucket_id() const { return _super_bucket_id; }
    const document::BucketId& getLastBucket() const { return _lastBucket; }

    void setVisitorStatistics(const vdslib::VisitorStatistics& stats) { _visitorStatistics = stats; }

    const vdslib::VisitorStatistics& getVisitorStatistics() const { return _visitorStatistics; }

    DECLARE_STORAGEREPLY(CreateVisitorReply, onCreateVisitorReply)
};

/**
 * @class DestroyVisitorCommand
 * @ingroup message
 *
 * @brief Command for removing a visitor.
 */
class DestroyVisitorCommand : public StorageCommand {
private:
    vespalib::string  _instanceId;

public:
    explicit DestroyVisitorCommand(vespalib::stringref instanceId);

    const vespalib::string & getInstanceId() const { return _instanceId; }

    void print(std::ostream& out, bool verbose, const std::string& indent) const override;

    DECLARE_STORAGECOMMAND(DestroyVisitorCommand, onDestroyVisitor)
};

/**
 * @class DestroyVisitorReply
 * @ingroup message
 *
 * @brief Response to a destroy visitor command.
 */
class DestroyVisitorReply : public StorageReply {
public:
    explicit DestroyVisitorReply(const DestroyVisitorCommand& cmd);

    void print(std::ostream& out, bool verbose, const std::string& indent) const override;

    DECLARE_STORAGEREPLY(DestroyVisitorReply, onDestroyVisitorReply)
};

/**
 * @class VisitorInfoCommand
 * @ingroup message
 *
 * @brief Sends status information of an ongoing visitor.
 *
 * Includes three different kinds of data.
 *  - Notification when visiting is complete.
 *  - Notification when individual buckets have been completely visited.
 *    (Including the timestamp of the newest document visited)
 *  - Notification that some error condition arose during visiting.
 */
class VisitorInfoCommand : public StorageCommand {
public:
    struct BucketTimestampPair {
        document::BucketId bucketId;
        Timestamp timestamp;

        BucketTimestampPair() noexcept : bucketId(), timestamp(0) {}
        BucketTimestampPair(const document::BucketId& bucket, const Timestamp& ts) noexcept
            : bucketId(bucket), timestamp(ts)
        {}

        bool operator==(const BucketTimestampPair& other) const noexcept {
            return (bucketId == other.bucketId && timestamp && other.timestamp);
        }
    };

private:
    bool _completed;
    std::vector<BucketTimestampPair> _bucketsCompleted;
    ReturnCode _error;

public:
    VisitorInfoCommand();
    ~VisitorInfoCommand() override;

    void setErrorCode(ReturnCode && code) { _error = std::move(code); }
    void setCompleted() { _completed = true; }
    void setBucketCompleted(const document::BucketId& id, Timestamp lastVisited) {
        _bucketsCompleted.push_back(BucketTimestampPair(id, lastVisited));
    }
    void setBucketsCompleted(const std::vector<BucketTimestampPair>& bc) {
        _bucketsCompleted = bc;
    }

    const ReturnCode& getErrorCode() const { return _error; }
    const std::vector<BucketTimestampPair>& getCompletedBucketsList() const {
        return _bucketsCompleted;
    }
    bool visitorCompleted() const { return _completed; }

    void print(std::ostream& out, bool verbose, const std::string& indent) const override;

    DECLARE_STORAGECOMMAND(VisitorInfoCommand, onVisitorInfo)
};

std::ostream& operator<<(std::ostream& out, const VisitorInfoCommand::BucketTimestampPair& pair);

class VisitorInfoReply : public StorageReply {
    bool _completed;

public:
    VisitorInfoReply(const VisitorInfoCommand& cmd);
    bool visitorCompleted() const { return _completed; }
    void print(std::ostream& out, bool verbose, const std::string& indent) const override;

    DECLARE_STORAGEREPLY(VisitorInfoReply, onVisitorInfoReply)
};

}