aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/vespa/storage/storageserver/communicationmanager.h
blob: a0ae4bf3b43aa4fdfa6cb30c97da3e430bb41ece (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
/**
 * @class CommunicationManager
 * @ingroup storageserver
 *
 * @brief Class used for sending messages over the network.
 *
 * @version $Id$
 */

#pragma once

#include "communicationmanagermetrics.h"
#include "documentapiconverter.h"
#include "message_enqueuer.h"
#include <vespa/storage/common/storagelink.h>
#include <vespa/storage/common/storagecomponent.h>
#include <vespa/storage/config/config-stor-communicationmanager.h>
#include <vespa/storageframework/generic/metric/metricupdatehook.h>
#include <vespa/storageapi/mbusprot/storagecommand.h>
#include <vespa/storageapi/mbusprot/storagereply.h>
#include <vespa/messagebus/imessagehandler.h>
#include <vespa/messagebus/ireplyhandler.h>
#include <vespa/config/helper/configfetcher.h>
#include <vespa/vespalib/util/document_runnable.h>
#include <vespa/config/subscription/configuri.h>
#include <map>
#include <queue>
#include <atomic>
#include <mutex>
#include <vespa/config-bucketspaces.h>

namespace mbus {
    class RPCMessageBus;
    class SourceSession;
    class DestinationSession;
}
namespace storage {

struct BucketResolver;
class VisitorMbusSession;
class Visitor;
class VisitorThread;
class FNetListener;
class RPCRequestWrapper;

class Queue {
private:
    using QueueType = std::queue<std::shared_ptr<api::StorageMessage>>;
    QueueType _queue;
    vespalib::Monitor _queueMonitor;

public:
    Queue();
    ~Queue();

    /**
     * Returns the next event from the event queue
     * @param   msg             The next event
     * @param   timeout         Millisecs to wait if the queue is empty
     * (0 = don't wait, -1 = forever)
     * @return  true or false if the queue was empty.
     */
    bool getNext(std::shared_ptr<api::StorageMessage>& msg, int timeout);

    /**
     * Enqueue msg in FIFO order.
     */
    void enqueue(std::shared_ptr<api::StorageMessage> msg);

    /** Signal queue monitor. */
    void signal();

    size_t size() const;
};

class StorageTransportContext : public api::TransportContext {
public:
    StorageTransportContext(std::unique_ptr<documentapi::DocumentMessage> msg);
    StorageTransportContext(std::unique_ptr<mbusprot::StorageCommand> msg);
    StorageTransportContext(std::unique_ptr<RPCRequestWrapper> request);
    ~StorageTransportContext();

    std::unique_ptr<documentapi::DocumentMessage> _docAPIMsg;
    std::unique_ptr<mbusprot::StorageCommand>     _storageProtocolMsg;
    std::unique_ptr<RPCRequestWrapper>            _request;
};

class CommunicationManager final
    : public StorageLink,
      public framework::Runnable,
      private config::IFetcherCallback<vespa::config::content::core::StorCommunicationmanagerConfig>,
      public mbus::IMessageHandler,
      public mbus::IReplyHandler,
      private framework::MetricUpdateHook,
      public MessageEnqueuer
{
private:
    CommunicationManager(const CommunicationManager&);
    CommunicationManager& operator=(const CommunicationManager&);

    StorageComponent _component;
    CommunicationManagerMetrics _metrics;

    std::unique_ptr<FNetListener> _listener;
    Queue _eventQueue;
    // XXX: Should perhaps use a configsubscriber and poll from StorageComponent ?
    std::unique_ptr<config::ConfigFetcher> _configFetcher;
    using EarlierProtocol = std::pair<framework::SecondTime, mbus::IProtocol::SP>;
    using EarlierProtocols = std::vector<EarlierProtocol>;
    std::mutex       _earlierGenerationsLock;
    EarlierProtocols _earlierGenerations;

    void onOpen() override;
    void onClose() override;

    void process(const std::shared_ptr<api::StorageMessage>& msg);

    using CommunicationManagerConfig = vespa::config::content::core::StorCommunicationmanagerConfig;
    using BucketspacesConfig = vespa::config::content::core::BucketspacesConfig;

    void configureMessageBusLimits(const CommunicationManagerConfig& cfg);
    void configure(std::unique_ptr<CommunicationManagerConfig> config) override;
    void receiveStorageReply(const std::shared_ptr<api::StorageReply>&);
    void fail_with_unresolvable_bucket_space(std::unique_ptr<documentapi::DocumentMessage> msg,
                                             const vespalib::string& error_message);

    void serializeNodeState(const api::GetNodeStateReply& gns, std::ostream& os, bool includeDescription,
                            bool includeDiskDescription, bool useOldFormat) const;

    static const uint64_t FORWARDED_MESSAGE = 0;

    std::unique_ptr<mbus::RPCMessageBus> _mbus;
    std::unique_ptr<mbus::DestinationSession> _messageBusSession;
    std::unique_ptr<mbus::SourceSession> _sourceSession;
    uint32_t _count;

    vespalib::Lock _messageBusSentLock;
    std::map<api::StorageMessage::Id, std::shared_ptr<api::StorageCommand> > _messageBusSent;

    config::ConfigUri _configUri;
    std::atomic<bool> _closed;
    DocumentApiConverter _docApiConverter;
    framework::Thread::UP _thread;

    void updateMetrics(const MetricLockGuard &) override;

    // Test needs access to configure() for live reconfig testing.
    friend struct CommunicationManagerTest;

public:
    CommunicationManager(StorageComponentRegister& compReg,
                         const config::ConfigUri & configUri);
    ~CommunicationManager();

    void enqueue(std::shared_ptr<api::StorageMessage> msg) override;
    mbus::RPCMessageBus& getMessageBus() { assert(_mbus.get()); return *_mbus; }
    const PriorityConverter& getPriorityConverter() const { return _docApiConverter.getPriorityConverter(); }

    /**
     * From StorageLink. Called when messages arrive from storage
     * modules. Will convert and dispatch messages to MessageServer
     */
    bool onUp(const std::shared_ptr<api::StorageMessage>&) override;
    bool sendCommand(const std::shared_ptr<api::StorageCommand>& command);
    bool sendReply(const std::shared_ptr<api::StorageReply>& reply);
    void sendDirectRPCReply(RPCRequestWrapper& request, const std::shared_ptr<api::StorageReply>& reply);
    void sendMessageBusReply(StorageTransportContext& context, const std::shared_ptr<api::StorageReply>& reply);

    // Pump thread
    void run(framework::ThreadHandle&) override;
    void print(std::ostream& out, bool verbose, const std::string& indent) const override;

    void handleMessage(std::unique_ptr<mbus::Message> msg) override;
    void sendMessageBusMessage(const std::shared_ptr<api::StorageCommand>& msg,
                               std::unique_ptr<mbus::Message> mbusMsg, const mbus::Route& route);

    void handleReply(std::unique_ptr<mbus::Reply> msg) override;
    void updateMessagebusProtocol(const std::shared_ptr<const document::DocumentTypeRepo> &repo);
    void updateBucketSpacesConfig(const BucketspacesConfig&);

    const CommunicationManagerMetrics& metrics() const noexcept { return _metrics; }
};

} // storage