aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/vespa/storage/storageserver/communicationmanager.h
blob: 7a910336b13cc3a52a2d6316648b459f0d0d6c41 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
/**
 * Class used for sending messages over the network.
 */

#pragma once

#include "communicationmanagermetrics.h"
#include "documentapiconverter.h"
#include "message_dispatcher.h"
#include <vespa/storage/common/storagelink.h>
#include <vespa/storage/common/storagecomponent.h>
#include <vespa/storage/config/config-stor-communicationmanager.h>
#include <vespa/storageframework/generic/metric/metricupdatehook.h>
#include <vespa/storageframework/generic/thread/runnable.h>
#include <vespa/storageapi/mbusprot/storagecommand.h>
#include <vespa/storageapi/mbusprot/storagereply.h>
#include <vespa/messagebus/imessagehandler.h>
#include <vespa/messagebus/ireplyhandler.h>
#include <vespa/config/helper/ifetchercallback.h>
#include <vespa/config/subscription/configuri.h>
#include <vespa/config-bucketspaces.h>
#include <map>
#include <queue>
#include <atomic>
#include <mutex>

namespace config {
    class ConfigFetcher;
}
namespace mbus {
    class RPCMessageBus;
    class SourceSession;
    class DestinationSession;
}
namespace storage {

namespace rpc {
class ClusterControllerApiRpcService;
class MessageCodecProvider;
class SharedRpcResources;
class StorageApiRpcService;
}

struct BucketResolver;
class Visitor;
class VisitorThread;
class RPCRequestWrapper;

class StorageTransportContext : public api::TransportContext {
public:
    explicit StorageTransportContext(std::unique_ptr<documentapi::DocumentMessage> msg);
    explicit StorageTransportContext(std::unique_ptr<RPCRequestWrapper> request);
    ~StorageTransportContext() override;

    std::unique_ptr<documentapi::DocumentMessage> _docAPIMsg;
    std::unique_ptr<RPCRequestWrapper>            _request;
};

class CommunicationManager final
    : public StorageLink,
      public framework::Runnable,
      public mbus::IMessageHandler,
      public mbus::IReplyHandler,
      private framework::MetricUpdateHook,
      public MessageDispatcher
{
private:
    StorageComponent _component;
    CommunicationManagerMetrics _metrics;

    std::unique_ptr<rpc::SharedRpcResources> _shared_rpc_resources;
    std::unique_ptr<rpc::StorageApiRpcService> _storage_api_rpc_service;
    std::unique_ptr<rpc::ClusterControllerApiRpcService> _cc_rpc_service;
    std::unique_ptr<rpc::MessageCodecProvider> _message_codec_provider;
    Queue _eventQueue;
    using EarlierProtocol = std::pair<vespalib::steady_time , mbus::IProtocol::SP>;
    using EarlierProtocols = std::vector<EarlierProtocol>;
    std::mutex       _earlierGenerationsLock;
    EarlierProtocols _earlierGenerations;

    void onOpen() override;
    void onClose() override;
    void onFlush(bool downwards) override;

    void process(const std::shared_ptr<api::StorageMessage>& msg);

    using CommunicationManagerConfig = vespa::config::content::core::StorCommunicationmanagerConfig;
    using BucketspacesConfig = vespa::config::content::core::BucketspacesConfig;

    void configureMessageBusLimits(const CommunicationManagerConfig& cfg);
    void receiveStorageReply(const std::shared_ptr<api::StorageReply>&);
    void fail_with_unresolvable_bucket_space(std::unique_ptr<documentapi::DocumentMessage> msg,
                                             const vespalib::string& error_message);

    void serializeNodeState(const api::GetNodeStateReply& gns, std::ostream& os, bool includeDescription) const;

    static const uint64_t FORWARDED_MESSAGE = 0;

    std::unique_ptr<CommunicationManagerConfig> _bootstrap_config;
    std::unique_ptr<mbus::RPCMessageBus> _mbus;
    std::unique_ptr<mbus::DestinationSession> _messageBusSession;
    std::unique_ptr<mbus::SourceSession> _sourceSession;

    std::mutex _messageBusSentLock;
    std::map<api::StorageMessage::Id, std::shared_ptr<api::StorageCommand> > _messageBusSent;

    config::ConfigUri     _configUri;
    std::atomic<bool>     _closed;
    DocumentApiConverter  _docApiConverter;
    std::unique_ptr<framework::Thread> _thread;

    void updateMetrics(const MetricLockGuard &) override;

    // Test needs access to configure() for live reconfig testing.
    friend struct CommunicationManagerTest;

public:
    CommunicationManager(const CommunicationManager&) = delete;
    CommunicationManager& operator=(const CommunicationManager&) = delete;
    CommunicationManager(StorageComponentRegister& compReg,
                         const config::ConfigUri& configUri,
                         const CommunicationManagerConfig& bootstrap_config);
    ~CommunicationManager() override;

    void on_configure(const CommunicationManagerConfig& config);

    // MessageDispatcher overrides
    void dispatch_sync(std::shared_ptr<api::StorageMessage> msg) override;
    void dispatch_async(std::shared_ptr<api::StorageMessage> msg) override;

    mbus::RPCMessageBus& getMessageBus() { return *_mbus; }
    const PriorityConverter& getPriorityConverter() const { return _docApiConverter.getPriorityConverter(); }

    /**
     * From StorageLink. Called when messages arrive from storage
     * modules. Will convert and dispatch messages to MessageServer
     */
    bool onUp(const std::shared_ptr<api::StorageMessage>&) override;
    bool sendCommand(const std::shared_ptr<api::StorageCommand>& command);
    bool sendReply(const std::shared_ptr<api::StorageReply>& reply);
    void sendDirectRPCReply(RPCRequestWrapper& request, const std::shared_ptr<api::StorageReply>& reply);
    void sendMessageBusReply(StorageTransportContext& context, const std::shared_ptr<api::StorageReply>& reply);

    // Pump thread
    void run(framework::ThreadHandle&) override;
    void print(std::ostream& out, bool verbose, const std::string& indent) const override;

    void handleMessage(std::unique_ptr<mbus::Message> msg) override;
    void sendMessageBusMessage(const std::shared_ptr<api::StorageCommand>& msg,
                               std::unique_ptr<mbus::Message> mbusMsg, const mbus::Route& route);

    void handleReply(std::unique_ptr<mbus::Reply> msg) override;
    void updateMessagebusProtocol(const std::shared_ptr<const document::DocumentTypeRepo> &repo);
    void updateBucketSpacesConfig(const BucketspacesConfig&);

    const CommunicationManagerMetrics& metrics() const noexcept { return _metrics; }

    // Intended primarily for unit tests that fire up multiple nodes and must wait until all
    // nodes are cross-visible in Slobrok before progressing.
    [[nodiscard]] bool address_visible_in_slobrok(const api::StorageMessageAddress& addr) const noexcept;
};

} // storage