aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/vespa/storage/persistence/filestorage/filestormanager.h
blob: cf004c588201a6915feee87bde043b577120ec5c (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
/**
 * @class storage::FileStorManager
 * @ingroup filestorage
 *
 * @version $Id$
 */

#pragma once

#include "filestorhandler.h"
#include "service_layer_host_info_reporter.h"
#include <vespa/vespalib/util/isequencedtaskexecutor.h>
#include <vespa/document/bucket/bucketid.h>
#include <vespa/persistence/spi/bucketexecutor.h>
#include <vespa/storage/bucketdb/storbucketdb.h>
#include <vespa/storage/common/messagesender.h>
#include <vespa/storage/common/servicelayercomponent.h>
#include <vespa/storage/common/statusmessages.h>
#include <vespa/storage/common/storagelinkqueued.h>
#include <vespa/config-stor-filestor.h>
#include <vespa/storage/persistence/diskthread.h>
#include <vespa/storage/common/nodestateupdater.h>
#include <vespa/storageframework/generic/status/htmlstatusreporter.h>
#include <vespa/config/helper/ifetchercallback.h>
#include <vespa/vespalib/util/memory_trap.h>

namespace config {
    class ConfigUri;
    class ConfigFetcher;
}
namespace vespalib { class IDestructorCallback; }

namespace storage {
namespace api {
    class ReturnCode;
    class StorageReply;
    class BucketCommand;
}
namespace spi { struct PersistenceProvider; }

class ContentBucketSpace;
struct FileStorManagerTest;
class ReadBucketList;
class BucketOwnershipNotifier;
class AbortBucketOperationsCommand;
struct DoneInitializeHandler;
class HostInfo;
class PersistenceHandler;
struct FileStorMetrics;
class ProviderErrorWrapper;

class FileStorManager : public StorageLinkQueued,
                        public framework::HtmlStatusReporter,
                        public StateListener,
                        private config::IFetcherCallback<vespa::config::content::StorFilestorConfig>,
                        public MessageSender,
                        public spi::BucketExecutor
{
    ServiceLayerComponentRegister             & _compReg;
    ServiceLayerComponent                       _component;
    std::unique_ptr<spi::PersistenceProvider>   _provider;
    DoneInitializeHandler                     & _init_handler;
    const document::BucketIdFactory           & _bucketIdFactory;

    std::vector<std::unique_ptr<PersistenceHandler>> _persistenceHandlers;
    std::vector<std::unique_ptr<DiskThread>>         _threads;
    std::unique_ptr<BucketOwnershipNotifier>         _bucketOwnershipNotifier;

    std::unique_ptr<vespa::config::content::StorFilestorConfig> _config;
    std::unique_ptr<config::ConfigFetcher> _configFetcher;
    bool                  _use_async_message_handling_on_schedule;
    std::shared_ptr<FileStorMetrics> _metrics;
    // Spray&pray over a few different size classes
    std::unique_ptr<vespalib::HeapMemoryTrap> _mem_trap_1;
    std::unique_ptr<vespalib::HeapMemoryTrap> _mem_trap_2;
    std::unique_ptr<vespalib::HeapMemoryTrap> _mem_trap_3;
    std::unique_ptr<vespalib::HeapMemoryTrap> _mem_trap_4;
    std::unique_ptr<FileStorHandler> _filestorHandler;
    std::unique_ptr<vespalib::ISequencedTaskExecutor> _sequencedExecutor;

    bool       _closed;
    std::mutex _lock;
    std::unique_ptr<vespalib::IDestructorCallback> _bucketExecutorRegistration;
    ServiceLayerHostInfoReporter                   _host_info_reporter;
    std::unique_ptr<vespalib::IDestructorCallback> _resource_usage_listener_registration;

public:
    FileStorManager(const config::ConfigUri &, spi::PersistenceProvider&,
                    ServiceLayerComponentRegister&, DoneInitializeHandler&, HostInfo&);
    FileStorManager(const FileStorManager &) = delete;
    FileStorManager& operator=(const FileStorManager &) = delete;

    ~FileStorManager() override;

    void print(std::ostream& out, bool verbose, const std::string& indent) const override;

    FileStorHandler& getFileStorHandler() noexcept {
        return *_filestorHandler;
    };

    spi::PersistenceProvider& getPersistenceProvider() noexcept {
        return *_provider;
    }
    ProviderErrorWrapper& error_wrapper() noexcept;

    void handleNewState() noexcept override;

    // Must be called exactly once at startup _before_ storage chain is opened.
    // This function expects that no external messages may arrive prior to, or
    // concurrently with this call, such as client operations or cluster controller
    // node state requests.
    // By ensuring that this function is called prior to chain opening, this invariant
    // shall be upheld since no RPC/MessageBus endpoints have been made available
    // yet at that point in time.
    // Must always be called _before_ complete_internal_initialization()
    void initialize_bucket_databases_from_provider();
    // Tag node internally as having completed initialization. Updates reported state
    // (although this will not be communicated out of the process until the
    // CommunicationManager thread has been fired up).
    void complete_internal_initialization();

    const FileStorMetrics& get_metrics() const { return *_metrics; }

private:
    void configure(std::unique_ptr<vespa::config::content::StorFilestorConfig> config) override;
    PersistenceHandler & createRegisteredHandler(const ServiceLayerComponent & component);
    VESPA_DLL_LOCAL PersistenceHandler & getThreadLocalHandler();

    void replyWithBucketNotFound(api::StorageMessage&, const document::Bucket&);

    void replyDroppedOperation(api::StorageMessage& msg,
                               const document::Bucket& bucket,
                               api::ReturnCode::Result returnCode,
                               vespalib::stringref reason);

    StorBucketDatabase::WrappedEntry ensureConsistentBucket(
            const document::Bucket& bucket,
            api::StorageMessage& msg,
            const char* callerId);

    bool validateApplyDiffCommandBucket(api::StorageMessage& msg, const StorBucketDatabase::WrappedEntry&);
    bool validateDiffReplyBucket(const StorBucketDatabase::WrappedEntry&, const document::Bucket&);

    StorBucketDatabase::WrappedEntry mapOperationToDisk(api::StorageMessage&, const document::Bucket&);
    StorBucketDatabase::WrappedEntry mapOperationToBucketAndDisk(api::BucketCommand&, const document::DocumentId*);
    bool handlePersistenceMessage(const std::shared_ptr<api::StorageMessage>&);

    // Document operations
    bool onPut(const std::shared_ptr<api::PutCommand>&) override;
    bool onUpdate(const std::shared_ptr<api::UpdateCommand>&) override;
    bool onGet(const std::shared_ptr<api::GetCommand>&) override;
    bool onRemove(const std::shared_ptr<api::RemoveCommand>&) override;
    bool onStatBucket(const std::shared_ptr<api::StatBucketCommand>&) override;

    // Bucket operations
    bool onRemoveLocation(const std::shared_ptr<api::RemoveLocationCommand>&) override;
    bool onCreateBucket(const std::shared_ptr<api::CreateBucketCommand>&) override;
    bool onDeleteBucket(const std::shared_ptr<api::DeleteBucketCommand>&) override;
    bool onMergeBucket(const std::shared_ptr<api::MergeBucketCommand>&) override;
    bool onGetBucketDiff(const std::shared_ptr<api::GetBucketDiffCommand>&) override;
    bool onGetBucketDiffReply(const std::shared_ptr<api::GetBucketDiffReply>&) override;
    bool onApplyBucketDiff(const std::shared_ptr<api::ApplyBucketDiffCommand>&) override;
    bool onApplyBucketDiffReply(const std::shared_ptr<api::ApplyBucketDiffReply>&) override;
    bool onJoinBuckets(const std::shared_ptr<api::JoinBucketsCommand>&) override;
    bool onSplitBucket(const std::shared_ptr<api::SplitBucketCommand>&) override;
    bool onSetBucketState(const std::shared_ptr<api::SetBucketStateCommand>&) override;
    bool onNotifyBucketChangeReply(const std::shared_ptr<api::NotifyBucketChangeReply>&) override { return true; }

    // Other
    bool onInternal(const std::shared_ptr<api::InternalCommand>&) override;
    bool onInternalReply(const std::shared_ptr<api::InternalReply>&) override;

    void handleAbortBucketOperations(const std::shared_ptr<AbortBucketOperationsCommand>&);
    void sendCommand(const std::shared_ptr<api::StorageCommand>&) override;
    void sendReply(const std::shared_ptr<api::StorageReply>&) override;
    void sendReplyDirectly(const std::shared_ptr<api::StorageReply>&) override;
    void sendUp(const std::shared_ptr<api::StorageMessage>&) override;
    void onClose() override;
    void onFlush(bool downwards) override;
    void reportHtmlStatus(std::ostream&, const framework::HttpUrlPath&) const override;
    void storageDistributionChanged() override;
    [[nodiscard]] static bool should_deactivate_buckets(const ContentBucketSpace& space,
                                                        bool node_up_in_space,
                                                        bool maintenance_in_all_spaces) noexcept;
    [[nodiscard]] bool maintenance_in_all_spaces(const lib::Node& node) const noexcept;
    void maybe_log_received_cluster_state();
    void updateState();
    void propagateClusterStates();
    void update_reported_state_after_db_init();

    void execute(const spi::Bucket &bucket, std::unique_ptr<spi::BucketTask> task) override;
};

} // storage