aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/vespa/storageapi/messageapi/storagemessage.h
blob: af7f5e6cd7d94ff481d6681b462a2e51041afbad (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.

/**
 * Superclass for all storage messages.
 */

#pragma once

#include "messagehandler.h"
#include <vespa/messagebus/routing/route.h>
#include <vespa/messagebus/trace.h>
#include <vespa/vdslib/state/nodetype.h>
#include <vespa/document/bucket/bucket.h>
#include <vespa/vespalib/util/printable.h>
#include <map>
#include <iosfwd>

namespace vespalib { class asciistream; }
// The following macros are provided as a way to write storage messages simply.
// They implement the parts of the code that can easily be automaticly
// generated.

/**
 * Adds a messagehandler callback and some utilities
 */
#define DECLARE_POINTER_TYPEDEFS(message) \
    using UP = std::unique_ptr<message>; \
    using SP = std::shared_ptr<message>; \
    using CSP = std::shared_ptr<const message>;

#define DECLARE_STORAGEREPLY(reply, callback) \
public: \
    DECLARE_POINTER_TYPEDEFS(reply) \
private: \
    bool callHandler(storage::api::MessageHandler& h, \
                     const std::shared_ptr<storage::api::StorageMessage>& m) const override \
    { \
        return h.callback(std::static_pointer_cast<reply>(m)); \
    }

/** Commands also has a command to implement to create the reply. */
#define DECLARE_STORAGECOMMAND(command, callback) \
public: \
    std::unique_ptr<storage::api::StorageReply> makeReply() override; \
    DECLARE_STORAGEREPLY(command, callback)

/** This macro implements common stuff for all storage messages. */
#define IMPLEMENT_COMMON(message) \

/** This macro is used to implement common storage reply functionality. */
#define IMPLEMENT_REPLY(reply) \
    IMPLEMENT_COMMON(reply) \

/** This macro is used to implement common storage command functionality. */
#define IMPLEMENT_COMMAND(command, reply) \
    IMPLEMENT_COMMON(command) \
    std::unique_ptr<storage::api::StorageReply> \
    storage::api::command::makeReply() \
    { \
        return std::make_unique<reply>(*this); \
    }

namespace storage::api {

using duration = vespalib::duration;

/**
 * This class defines the different message types we have.
 *
 * This is used to be able to deserialize messages of various classes.
 */
class MessageType : public vespalib::Printable {
public:
    enum Id {
        GET_ID = 4,
        GET_REPLY_ID = 5,
        INTERNAL_ID = 6,
        INTERNAL_REPLY_ID = 7,
        PUT_ID = 10,
        PUT_REPLY_ID = 11,
        REMOVE_ID = 12,
        REMOVE_REPLY_ID = 13,
        // REVERT_ID = 14,       unused but reserved
        // REVERT_REPLY_ID = 15, unused but reserved
        STAT_ID = 16,
        STAT_REPLY_ID = 17,
        VISITOR_CREATE_ID = 18,
        VISITOR_CREATE_REPLY_ID = 19,
        VISITOR_DESTROY_ID = 20,
        VISITOR_DESTROY_REPLY_ID = 21,
        CREATEBUCKET_ID = 26,
        CREATEBUCKET_REPLY_ID = 27,
        MERGEBUCKET_ID = 32,
        MERGEBUCKET_REPLY_ID = 33,
        DELETEBUCKET_ID = 34,
        DELETEBUCKET_REPLY_ID = 35,
        SETNODESTATE_ID = 36,
        SETNODESTATE_REPLY_ID = 37,
        GETNODESTATE_ID = 38,
        GETNODESTATE_REPLY_ID = 39,
        SETSYSTEMSTATE_ID = 40,
        SETSYSTEMSTATE_REPLY_ID = 41,
        GETSYSTEMSTATE_ID = 42,
        GETSYSTEMSTATE_REPLY_ID = 43,
        GETBUCKETDIFF_ID = 50,
        GETBUCKETDIFF_REPLY_ID = 51,
        APPLYBUCKETDIFF_ID = 52,
        APPLYBUCKETDIFF_REPLY_ID = 53,
        REQUESTBUCKETINFO_ID = 54,
        REQUESTBUCKETINFO_REPLY_ID = 55,
        NOTIFYBUCKETCHANGE_ID = 56,
        NOTIFYBUCKETCHANGE_REPLY_ID = 57,
        DOCBLOCK_ID = 58,
        DOCBLOCK_REPLY_ID = 59,
        VISITOR_INFO_ID = 60,
        VISITOR_INFO_REPLY_ID = 61,
        // SEARCHRESULT_ID = 64,
        // SEARCHRESULT_REPLY_ID = 65,
        SPLITBUCKET_ID = 66,
        SPLITBUCKET_REPLY_ID = 67,
        JOINBUCKETS_ID = 68,
        JOINBUCKETS_REPLY_ID = 69,
        // DOCUMENTSUMMARY_ID = 72,
        // DOCUMENTSUMMARY_REPLY_ID = 73,
        MAPVISITOR_ID = 74,
        MAPVISITOR_REPLY_ID = 75,
        STATBUCKET_ID = 76,
        STATBUCKET_REPLY_ID = 77,
        GETBUCKETLIST_ID = 78,
        GETBUCKETLIST_REPLY_ID = 79,
        DOCUMENTLIST_ID = 80,
        DOCUMENTLIST_REPLY_ID = 81,
        UPDATE_ID = 82,
        UPDATE_REPLY_ID = 83,
        EMPTYBUCKETS_ID = 84,
        EMPTYBUCKETS_REPLY_ID = 85,
        REMOVELOCATION_ID = 86,
        REMOVELOCATION_REPLY_ID = 87,
        QUERYRESULT_ID = 88,
        QUERYRESULT_REPLY_ID = 89,
        SETBUCKETSTATE_ID = 94,
        SETBUCKETSTATE_REPLY_ID = 95,
        ACTIVATE_CLUSTER_STATE_VERSION_ID = 96,
        ACTIVATE_CLUSTER_STATE_VERSION_REPLY_ID = 97,
        MESSAGETYPE_MAX_ID
    };

private:
    static std::map<Id, MessageType*> _codes;
    const vespalib::string _name;
    Id _id;
    MessageType *_reply;
    const MessageType *_replyOf;

    MessageType(vespalib::stringref name, Id id, const MessageType* replyOf = 0);
public:
    static const MessageType DOCBLOCK;
    static const MessageType DOCBLOCK_REPLY;
    static const MessageType GET;
    static const MessageType GET_REPLY;
    static const MessageType INTERNAL;
    static const MessageType INTERNAL_REPLY;
    static const MessageType PUT;
    static const MessageType PUT_REPLY;
    static const MessageType REMOVE;
    static const MessageType REMOVE_REPLY;
    static const MessageType VISITOR_CREATE;
    static const MessageType VISITOR_CREATE_REPLY;
    static const MessageType VISITOR_DESTROY;
    static const MessageType VISITOR_DESTROY_REPLY;
    static const MessageType REQUESTBUCKETINFO;
    static const MessageType REQUESTBUCKETINFO_REPLY;
    static const MessageType NOTIFYBUCKETCHANGE;
    static const MessageType NOTIFYBUCKETCHANGE_REPLY;
    static const MessageType CREATEBUCKET;
    static const MessageType CREATEBUCKET_REPLY;
    static const MessageType MERGEBUCKET;
    static const MessageType MERGEBUCKET_REPLY;
    static const MessageType DELETEBUCKET;
    static const MessageType DELETEBUCKET_REPLY;
    static const MessageType SETNODESTATE;
    static const MessageType SETNODESTATE_REPLY;
    static const MessageType GETNODESTATE;
    static const MessageType GETNODESTATE_REPLY;
    static const MessageType SETSYSTEMSTATE;
    static const MessageType SETSYSTEMSTATE_REPLY;
    static const MessageType GETSYSTEMSTATE;
    static const MessageType GETSYSTEMSTATE_REPLY;
    static const MessageType ACTIVATE_CLUSTER_STATE_VERSION;
    static const MessageType ACTIVATE_CLUSTER_STATE_VERSION_REPLY;
    static const MessageType BUCKETSADDED;
    static const MessageType BUCKETSADDED_REPLY;
    static const MessageType BUCKETSREMOVED;
    static const MessageType BUCKETSREMOVED_REPLY;
    static const MessageType GETBUCKETDIFF;
    static const MessageType GETBUCKETDIFF_REPLY;
    static const MessageType APPLYBUCKETDIFF;
    static const MessageType APPLYBUCKETDIFF_REPLY;
    static const MessageType VISITOR_INFO;
    static const MessageType VISITOR_INFO_REPLY;
    static const MessageType SPLITBUCKET;
    static const MessageType SPLITBUCKET_REPLY;
    static const MessageType JOINBUCKETS;
    static const MessageType JOINBUCKETS_REPLY;
    static const MessageType MAPVISITOR;
    static const MessageType MAPVISITOR_REPLY;
    static const MessageType STATBUCKET;
    static const MessageType STATBUCKET_REPLY;
    static const MessageType GETBUCKETLIST;
    static const MessageType GETBUCKETLIST_REPLY;
    static const MessageType DOCUMENTLIST;
    static const MessageType DOCUMENTLIST_REPLY;
    static const MessageType UPDATE;
    static const MessageType UPDATE_REPLY;
    static const MessageType EMPTYBUCKETS;
    static const MessageType EMPTYBUCKETS_REPLY;
    static const MessageType REMOVELOCATION;
    static const MessageType REMOVELOCATION_REPLY;
    static const MessageType QUERYRESULT;
    static const MessageType QUERYRESULT_REPLY;
    static const MessageType SETBUCKETSTATE;
    static const MessageType SETBUCKETSTATE_REPLY;

    static const MessageType& get(Id id);

    MessageType(const MessageType &) = delete;
    MessageType& operator=(const MessageType &) = delete;
    ~MessageType();
    Id getId() const noexcept { return _id; }
    static Id getMaxId() noexcept { return MESSAGETYPE_MAX_ID; }
    const vespalib::string& getName() const noexcept { return _name; }
    bool isReply() const noexcept { return (_replyOf != 0); }
    /** Only valid to call on replies. */
    const MessageType& getCommandType() const noexcept { return *_replyOf; }
    /** Only valid to call on commands. */
    const MessageType& getReplyType() const noexcept { return *_reply; }
    bool operator==(const MessageType& type) const noexcept { return (_id == type._id); }
    bool operator!=(const MessageType& type) const noexcept { return (_id != type._id); }

    void print(std::ostream& out, bool verbose, const std::string& indent) const override;
};

/**
 * Represent an address we can send a storage message to.
 * We have two kinds of addresses:
 * - A VDS address used to send to a single VDS node.
 * - An external mbus route, used to send to an external source.
 */
class StorageMessageAddress {
public:
    enum class Protocol : uint8_t { STORAGE, DOCUMENT };

private:
    const vespalib::string  *_cluster;
    // Used for internal VDS addresses only
    uint32_t                 _precomputed_storage_hash;
    lib::NodeType::Type      _type;
    Protocol                 _protocol;
    uint16_t                 _index;

public:
    StorageMessageAddress() noexcept; // Only to be used when transient default ctor semantics are needed by containers
    StorageMessageAddress(const vespalib::string * cluster, const lib::NodeType& type, uint16_t index) noexcept;
    StorageMessageAddress(const vespalib::string * cluster, const lib::NodeType& type, uint16_t index, Protocol protocol) noexcept;
    ~StorageMessageAddress();

    void setProtocol(Protocol p) noexcept { _protocol = p; }

    mbus::Route to_mbus_route() const;
    Protocol getProtocol() const noexcept { return _protocol; }
    uint16_t getIndex() const noexcept { return _index; }
    lib::NodeType::Type getNodeType() const noexcept { return _type; }
    const vespalib::string& getCluster() const noexcept { return *_cluster; }

    // Returns precomputed hash over <type, index> pair. Other fields not included.
    [[nodiscard]] uint32_t internal_storage_hash() const noexcept {
        return _precomputed_storage_hash;
    }

    bool operator==(const StorageMessageAddress& other) const noexcept;
    vespalib::string toString() const;
    friend std::ostream & operator << (std::ostream & os, const StorageMessageAddress & addr);
    static StorageMessageAddress create(const vespalib::string * cluster, const lib::NodeType& type, uint16_t index) noexcept {
        return api::StorageMessageAddress(cluster, type, index);
    }
    static StorageMessageAddress createDocApi(const vespalib::string * cluster, const lib::NodeType& type, uint16_t index) noexcept {
        return api::StorageMessageAddress(cluster, type, index, Protocol::DOCUMENT);
    }
private:
    void print(vespalib::asciistream & out) const;
};

struct TransportContext {
    virtual ~TransportContext() = 0;
};

enum class LockingRequirements : uint8_t {
    // Operations with exclusive locking can only be executed iff no other
    // exclusive or shared locks are taken for its bucket.
    Exclusive = 0,
    // Operations with shared locking can only be executed iff no exclusive
    // lock is taken for its bucket. Should only be used for read-only operations
    // that cannot mutate a bucket's state.
    Shared
};

const char* to_string(LockingRequirements req) noexcept;
std::ostream& operator<<(std::ostream&, LockingRequirements);

// This mirrors spi::ReadConsistency and has the same semantics, but is
// decoupled to avoid extra cross-module dependencies.
// Note that the name _internal_ read consistency is intentional to lessen
// any ambiguities on whether this is consistency in a distributed systems
// setting (i.e. linearizability) on internally in the persistence provider.
enum class InternalReadConsistency : uint8_t {
    Strong = 0,
    Weak
};

const char* to_string(InternalReadConsistency consistency) noexcept;
std::ostream& operator<<(std::ostream&, InternalReadConsistency);

class StorageMessage : public vespalib::Printable
{
    friend class StorageMessageTest; // Used for testing only
public:
    DECLARE_POINTER_TYPEDEFS(StorageMessage);
    using Id = uint64_t;
    using Priority = uint8_t;

    enum LegacyPriorityValues {
        LOW = 225,
        NORMAL = 127,
        HIGH = 50,
        VERYHIGH = 0
    }; // FIXME
    //static const unsigned int NUM_PRIORITIES = UINT8_MAX;
    static const char* getPriorityString(Priority);

private:
    static document::Bucket getDummyBucket() noexcept {
        return {document::BucketSpace::invalid(), document::BucketId()};
    }

    mutable std::unique_ptr<TransportContext> _transportContext;
    const MessageType&    _type;
    Id                    _internal_msg_id;
    Id                    _originator_msg_id;
    StorageMessageAddress _address;
    vespalib::Trace       _trace;
    uint32_t              _approxByteSize;
    Priority              _priority;

protected:
    static Id generateMsgId() noexcept;

    StorageMessage(const MessageType& code, Id internal_id, Id originator_id) noexcept;
    StorageMessage(const StorageMessage&, Id internal_id, Id originator_id) noexcept;

public:
    StorageMessage& operator=(const StorageMessage&) = delete;
    StorageMessage(const StorageMessage&) = delete;
    ~StorageMessage() override;

    /**
     * Process-unique internal ID. For replies, corresponds to the message ID of its command.
     */
    Id getMsgId() const noexcept { return _internal_msg_id; }
    /**
     * If the message has an originator (i.e. sent from a peer), this is that peer's process-unique internal ID.
     * It must never be used by any _other_ nodes in a context where uniqueness is assumed.
     */
    [[nodiscard]] Id originator_msg_id() const noexcept { return _originator_msg_id; }

    /**
     * Set the originator ID of this message, to associate local messages with those
     * generated by a remote peer. Does not affect the process-unique internal ID.
     **/
    void force_originator_msg_id(Id msgId) noexcept { _originator_msg_id = msgId; }

    const MessageType& getType() const noexcept { return _type; }

    void setPriority(Priority p) noexcept { _priority = p; }
    Priority getPriority() const noexcept { return _priority; }

    const StorageMessageAddress* getAddress() const noexcept { return (_address.getNodeType() != lib::NodeType::Type::UNKNOWN) ? &_address : nullptr; }

    void setAddress(const StorageMessageAddress& address) noexcept {
        _address = address;
    }

    /**
     *  Returns the approximate memory footprint (in bytes) of a storage message.
     *  By default, returns 50 bytes.
     */
    uint32_t getApproxByteSize() const noexcept {
        return _approxByteSize;
    }

    void setApproxByteSize(uint32_t value) noexcept {
        _approxByteSize = value;
    }

    /**
     * Used by storage to remember the context in which this message was
     * created, whether it was a storageprotocol message, a documentprotocol
     * message, or an RPC call.
     */
    void setTransportContext(std::unique_ptr<TransportContext> context) noexcept {
        _transportContext = std::move(context);
    }

    std::unique_ptr<TransportContext> getTransportContext() const noexcept {
        return std::move(_transportContext);
    }

    bool has_transport_context() const noexcept {
        return (_transportContext.get() != nullptr);
    }

    /**
     * This method is overloaded in subclasses and will call the correct
     * method in the MessageHandler interface.
     */
    virtual bool callHandler(MessageHandler&, const StorageMessage::SP&) const = 0;
    virtual bool hasTestAndSetCondition() const noexcept { return false; }

    mbus::Trace && steal_trace() noexcept { return std::move(_trace); }
    mbus::Trace& getTrace() noexcept { return _trace; }
    const mbus::Trace& getTrace() const noexcept { return _trace; }

    /**
       Sets the trace object for this message.
    */
    void setTrace(vespalib::Trace && trace) noexcept { _trace = std::move(trace); }

    /**
     * Cheap version of tostring().
     */
    virtual vespalib::string getSummary() const;

    virtual document::Bucket getBucket() const { return getDummyBucket(); }
    document::BucketId getBucketId() const noexcept { return getBucket().getBucketId(); }
    virtual LockingRequirements lockingRequirements() const noexcept {
        // Safe default: assume exclusive locking is required.
        return LockingRequirements::Exclusive;
    }
};

}