aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/vespa/storage/persistence/persistenceutil.h
blob: 4bd0222bb9e0810b61d408f2a04ddd814feae482 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once

#include <vespa/storage/common/servicelayercomponent.h>
#include <vespa/storage/persistence/filestorage/filestorhandler.h>
#include <vespa/storage/persistence/filestorage/filestormetrics.h>
#include <vespa/storage/bucketdb/storbucketdb.h>
#include <vespa/storageframework/generic/clock/timer.h>
#include <vespa/storageapi/messageapi/returncode.h>
#include <vespa/persistence/spi/result.h>
#include <vespa/persistence/spi/context.h>
#include <vespa/vespalib/io/fileutil.h>

namespace storage::api {
    class StorageMessage;
    class StorageReply;
    class BucketInfo;
}

namespace storage::spi {
    struct PersistenceProvider;
}

namespace storage {

class PersistenceUtil;

class MessageTracker {
public:
    using UP = std::unique_ptr<MessageTracker>;

    MessageTracker(const framework::MilliSecTimer & timer, const PersistenceUtil & env, MessageSender & replySender,
                   FileStorHandler::BucketLockInterface::SP bucketLock, std::shared_ptr<api::StorageMessage> msg,
                   ThrottleToken throttle_token);

    ~MessageTracker();

    void setMetric(FileStorThreadMetrics::Op& metric);

    /**
     * Called by operation handlers to set reply if they need to send a
     * non-default reply. They should call this function as soon as they create
     * a reply, to ensure it is stored in case of failure after reply creation.
     */
    void setReply(std::shared_ptr<api::StorageReply> reply) {
        assert( ! _reply );
        _reply = std::move(reply);
    }

    /** Utility function to be able to write a bit less in client. */
    void fail(uint32_t result, const vespalib::string & message = "") {
        fail(api::ReturnCode((api::ReturnCode::Result)result, message));
    }
    /** Set the request to fail with the given failure. */
    void fail(const api::ReturnCode&);

    /** Don't send reply for the command being processed. Used by multi chain
     * commands like merge. */
    void dontReply() { _sendReply = false; }

    bool hasReply() const { return bool(_reply); }
    const api::StorageReply & getReply() const {
        return *_reply;
    }
    api::StorageReply & getReply() {
        return *_reply;
    }
    std::shared_ptr<api::StorageReply> && stealReplySP() && {
        return std::move(_reply);
    }

    void generateReply(api::StorageCommand& cmd);

    api::ReturnCode getResult() const { return _result; }

    spi::Context & context() { return _context; }
    document::BucketId getBucketId() const {
        return _bucketLock->getBucket().getBucketId();
    }

    void sendReply();

    bool checkForError(const spi::Result& response);

    // Returns a non-nullptr notifier instance iff the underlying operation wants to be notified
    // when the sync phase is complete. Otherwise returns a nullptr shared_ptr.
    std::shared_ptr<FileStorHandler::OperationSyncPhaseDoneNotifier> sync_phase_done_notifier_or_nullptr() const;

    static MessageTracker::UP
    createForTesting(const framework::MilliSecTimer & timer, PersistenceUtil & env, MessageSender & replySender,
                     FileStorHandler::BucketLockInterface::SP bucketLock, std::shared_ptr<api::StorageMessage> msg);

private:
    MessageTracker(const framework::MilliSecTimer & timer, const PersistenceUtil & env, MessageSender & replySender, bool updateBucketInfo,
                   FileStorHandler::BucketLockInterface::SP bucketLock, std::shared_ptr<api::StorageMessage> msg,
                   ThrottleToken throttle_token);

    [[nodiscard]] bool count_result_as_failure() const noexcept;

    bool                                     _sendReply;
    bool                                     _updateBucketInfo;
    FileStorHandler::BucketLockInterface::SP _bucketLock;
    std::shared_ptr<api::StorageMessage>     _msg;
    ThrottleToken                            _throttle_token;
    spi::Context                             _context;
    const PersistenceUtil                   &_env;
    MessageSender                           &_replySender;
    FileStorThreadMetrics::Op               *_metric; // needs a better and thread safe solution
    std::shared_ptr<api::StorageReply>       _reply;
    api::ReturnCode                          _result;
    framework::MilliSecTimer                 _timer;
};

class PersistenceUtil {
public:
    /** Lock the given bucket in the file stor handler. */
    struct LockResult {
        std::shared_ptr<FileStorHandler::BucketLockInterface> lock;
        LockResult() : lock() {}

        bool bucketExisted() const { return bool(lock); }
    };

    PersistenceUtil(const ServiceLayerComponent&, FileStorHandler& fileStorHandler,
                    FileStorThreadMetrics& metrics, spi::PersistenceProvider& provider);
    ~PersistenceUtil();

    StorBucketDatabase& getBucketDatabase(document::BucketSpace bucketSpace) const {
        return _component.getBucketDatabase(bucketSpace);
    }
    spi::Bucket getBucket(const document::DocumentId& id, const document::Bucket &bucket) const;
    void setBucketInfo(MessageTracker& tracker, const document::Bucket &bucket) const;
    void updateBucketDatabase(const document::Bucket &bucket, const api::BucketInfo& info) const;
    LockResult lockAndGetDisk(const document::Bucket &bucket, StorBucketDatabase::Flag flags = StorBucketDatabase::NONE);
    api::BucketInfo getBucketInfo(const document::Bucket &bucket) const;
    const document::DocumentTypeRepo & getDocumentTypeRepo() const {
        if (componentHasChanged()) {
            reloadComponent();
        }
        return *_repos->documentTypeRepo;
    }
    const document::FieldSetRepo & getFieldSetRepo() const {
        if (componentHasChanged()) {
            reloadComponent();
        }
        return *_repos->fieldSetRepo;
    }

    static api::BucketInfo convertBucketInfo(const spi::BucketInfo&);
    static uint32_t convertErrorCode(const spi::Result& response);
public:
    const ServiceLayerComponent                &_component;
    FileStorHandler                            &_fileStorHandler;
    FileStorThreadMetrics                      &_metrics;  // Needs a better solution for speed and thread safety
    uint16_t                                    _nodeIndex;
private:
    bool componentHasChanged() const {
        return _lastGeneration != _component.getGeneration();
    }
    void reloadComponent() const;

    const document::BucketIdFactory                 &_bucketIdFactory;
    spi::PersistenceProvider                        &_spi;
    mutable uint64_t                                 _lastGeneration;
    mutable std::shared_ptr<StorageComponent::Repos> _repos;
};

} // storage