// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. /** * @class storage::FileStorManager * @ingroup filestorage * * @version $Id$ */ #pragma once #include "filestorhandler.h" #include "service_layer_host_info_reporter.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace config { class ConfigUri; class ConfigFetcher; } namespace vespalib { class IDestructorCallback; } namespace storage { namespace api { class ReturnCode; class StorageReply; class BucketCommand; } namespace spi { struct PersistenceProvider; } class ContentBucketSpace; struct FileStorManagerTest; class ReadBucketList; class BucketOwnershipNotifier; class AbortBucketOperationsCommand; struct DoneInitializeHandler; class HostInfo; class PersistenceHandler; struct FileStorMetrics; class ProviderErrorWrapper; class FileStorManager : public StorageLinkQueued, public framework::HtmlStatusReporter, public StateListener, private config::IFetcherCallback, public MessageSender, public spi::BucketExecutor { ServiceLayerComponentRegister & _compReg; ServiceLayerComponent _component; std::unique_ptr _provider; DoneInitializeHandler & _init_handler; const document::BucketIdFactory & _bucketIdFactory; std::vector> _persistenceHandlers; std::vector> _threads; std::unique_ptr _bucketOwnershipNotifier; std::unique_ptr _config; std::unique_ptr _configFetcher; bool _use_async_message_handling_on_schedule; std::shared_ptr _metrics; // Spray&pray over a few different size classes std::unique_ptr _mem_trap_1; std::unique_ptr _mem_trap_2; std::unique_ptr _mem_trap_3; std::unique_ptr _mem_trap_4; std::unique_ptr _filestorHandler; std::unique_ptr _sequencedExecutor; bool _closed; std::mutex _lock; std::unique_ptr _bucketExecutorRegistration; ServiceLayerHostInfoReporter _host_info_reporter; std::unique_ptr _resource_usage_listener_registration; public: FileStorManager(const config::ConfigUri &, spi::PersistenceProvider&, ServiceLayerComponentRegister&, DoneInitializeHandler&, HostInfo&); FileStorManager(const FileStorManager &) = delete; FileStorManager& operator=(const FileStorManager &) = delete; ~FileStorManager() override; void print(std::ostream& out, bool verbose, const std::string& indent) const override; FileStorHandler& getFileStorHandler() noexcept { return *_filestorHandler; }; spi::PersistenceProvider& getPersistenceProvider() noexcept { return *_provider; } ProviderErrorWrapper& error_wrapper() noexcept; void handleNewState() noexcept override; // Must be called exactly once at startup _before_ storage chain is opened. // This function expects that no external messages may arrive prior to, or // concurrently with this call, such as client operations or cluster controller // node state requests. // By ensuring that this function is called prior to chain opening, this invariant // shall be upheld since no RPC/MessageBus endpoints have been made available // yet at that point in time. // Must always be called _before_ complete_internal_initialization() void initialize_bucket_databases_from_provider(); // Tag node internally as having completed initialization. Updates reported state // (although this will not be communicated out of the process until the // CommunicationManager thread has been fired up). void complete_internal_initialization(); const FileStorMetrics& get_metrics() const { return *_metrics; } private: void configure(std::unique_ptr config) override; PersistenceHandler & createRegisteredHandler(const ServiceLayerComponent & component); VESPA_DLL_LOCAL PersistenceHandler & getThreadLocalHandler(); void replyWithBucketNotFound(api::StorageMessage&, const document::Bucket&); void replyDroppedOperation(api::StorageMessage& msg, const document::Bucket& bucket, api::ReturnCode::Result returnCode, vespalib::stringref reason); StorBucketDatabase::WrappedEntry ensureConsistentBucket( const document::Bucket& bucket, api::StorageMessage& msg, const char* callerId); bool validateApplyDiffCommandBucket(api::StorageMessage& msg, const StorBucketDatabase::WrappedEntry&); bool validateDiffReplyBucket(const StorBucketDatabase::WrappedEntry&, const document::Bucket&); StorBucketDatabase::WrappedEntry mapOperationToDisk(api::StorageMessage&, const document::Bucket&); StorBucketDatabase::WrappedEntry mapOperationToBucketAndDisk(api::BucketCommand&, const document::DocumentId*); bool handlePersistenceMessage(const std::shared_ptr&); // Document operations bool onPut(const std::shared_ptr&) override; bool onUpdate(const std::shared_ptr&) override; bool onGet(const std::shared_ptr&) override; bool onRemove(const std::shared_ptr&) override; bool onStatBucket(const std::shared_ptr&) override; // Bucket operations bool onRemoveLocation(const std::shared_ptr&) override; bool onCreateBucket(const std::shared_ptr&) override; bool onDeleteBucket(const std::shared_ptr&) override; bool onMergeBucket(const std::shared_ptr&) override; bool onGetBucketDiff(const std::shared_ptr&) override; bool onGetBucketDiffReply(const std::shared_ptr&) override; bool onApplyBucketDiff(const std::shared_ptr&) override; bool onApplyBucketDiffReply(const std::shared_ptr&) override; bool onJoinBuckets(const std::shared_ptr&) override; bool onSplitBucket(const std::shared_ptr&) override; bool onSetBucketState(const std::shared_ptr&) override; bool onNotifyBucketChangeReply(const std::shared_ptr&) override { return true; } // Other bool onInternal(const std::shared_ptr&) override; bool onInternalReply(const std::shared_ptr&) override; void handleAbortBucketOperations(const std::shared_ptr&); void sendCommand(const std::shared_ptr&) override; void sendReply(const std::shared_ptr&) override; void sendReplyDirectly(const std::shared_ptr&) override; void sendUp(const std::shared_ptr&) override; void onClose() override; void onFlush(bool downwards) override; void reportHtmlStatus(std::ostream&, const framework::HttpUrlPath&) const override; void storageDistributionChanged() override; [[nodiscard]] static bool should_deactivate_buckets(const ContentBucketSpace& space, bool node_up_in_space, bool maintenance_in_all_spaces) noexcept; [[nodiscard]] bool maintenance_in_all_spaces(const lib::Node& node) const noexcept; void maybe_log_received_cluster_state(); void updateState(); void propagateClusterStates(); void update_reported_state_after_db_init(); void execute(const spi::Bucket &bucket, std::unique_ptr task) override; }; } // storage