aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/vespa/storage/storageserver/servicelayernode.cpp
blob: 846d6ed09bf2f27aeb5b21eafc0facf2c2dbe809 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.

#include "servicelayernode.h"
#include "bouncer.h"
#include "communicationmanager.h"
#include "changedbucketownershiphandler.h"
#include "mergethrottler.h"
#include "statemanager.h"
#include "priorityconverter.h"
#include "service_layer_error_listener.h"
#include <vespa/storage/common/i_storage_chain_builder.h>
#include <vespa/storage/visiting/messagebusvisitormessagesession.h>
#include <vespa/storage/visiting/visitormanager.h>
#include <vespa/storage/bucketdb/bucketmanager.h>
#include <vespa/storage/persistence/filestorage/filestormanager.h>
#include <vespa/storage/persistence/filestorage/modifiedbucketchecker.h>
#include <vespa/persistence/spi/exceptions.h>
#include <vespa/vespalib/util/exceptions.h>
#include <vespa/messagebus/rpcmessagebus.h>
#include <vespa/config/common/exceptions.h>

#include <vespa/log/log.h>
LOG_SETUP(".node.servicelayer");

namespace storage {

ServiceLayerNode::ServiceLayerNode(const config::ConfigUri & configUri, ServiceLayerNodeContext& context,
                                   ApplicationGenerationFetcher& generationFetcher,
                                   spi::PersistenceProvider& persistenceProvider,
                                   const VisitorFactory::Map& externalVisitors)
    : StorageNode(configUri, context, generationFetcher, std::make_unique<HostInfo>()),
      _context(context),
      _persistenceProvider(persistenceProvider),
      _externalVisitors(externalVisitors),
      _bucket_manager(nullptr),
      _fileStorManager(nullptr),
      _init_has_been_called(false)
{
}

void
ServiceLayerNode::report(vespalib::JsonStream &stream) const
{
    using namespace vespalib::jsonstream;
    if (_bucket_manager) {
        stream << "metrics" << Object() << "values" << Array();
        _bucket_manager->report(stream);
        stream << End() << End();
    }
}

void ServiceLayerNode::init()
{
    assert( ! _init_has_been_called);
    _init_has_been_called = true;
    spi::Result initResult(_persistenceProvider.initialize());
    if (initResult.hasError()) {
        LOG(error, "Failed to initialize persistence provider: %s", initResult.toString().c_str());
        throw spi::HandledException("Failed provider init: " + initResult.toString(), VESPA_STRLOC);
    }

    try{
        initialize(*this);
    } catch (spi::HandledException& e) {
        requestShutdown("Failed to initialize: " + e.getMessage());
        throw;
    } catch (const config::ConfigTimeoutException &e) {
        LOG(warning, "Error subscribing to initial config: '%s'", e.what());
        throw;
    } catch (const vespalib::NetworkSetupFailureException & e) {
        LOG(warning, "Network failure: '%s'", e.what());
        throw;
    } catch (const vespalib::Exception & e) {
        LOG(error, "Caught exception %s during startup. Calling destruct functions in hopes of dying gracefully.",
            e.getMessage().c_str());
        requestShutdown("Failed to initialize: " + e.getMessage());
        throw;
    }
}

ServiceLayerNode::~ServiceLayerNode()
{
    assert(_init_has_been_called);
    shutdown();
}

void
ServiceLayerNode::subscribeToConfigs()
{
    StorageNode::subscribeToConfigs();
    _configFetcher.reset(new config::ConfigFetcher(_configUri.getContext()));
}

void
ServiceLayerNode::removeConfigSubscriptions()
{
    StorageNode::removeConfigSubscriptions();
    _configFetcher.reset();
}

void
ServiceLayerNode::initializeNodeSpecific()
{
    // Give node state to mount point initialization, such that we can
    // get capacity set in reported node state.
    NodeStateUpdater::Lock::SP lock(_component->getStateUpdater().grabStateChangeLock());
    lib::NodeState ns(*_component->getStateUpdater().getReportedNodeState());

    ns.setCapacity(_serverConfig->nodeCapacity);
    LOG(debug, "Adjusting reported node state to include capacity: %s", ns.toString().c_str());
    _component->getStateUpdater().setReportedNodeState(ns);
}

#define DIFFER(a) (!(oldC.a == newC.a))
#define ASSIGN(a) { oldC.a = newC.a; updated = true; }

void
ServiceLayerNode::handleLiveConfigUpdate(const InitialGuard & initGuard)
{
    if (_newServerConfig) {
        bool updated = false;
        vespa::config::content::core::StorServerConfigBuilder oldC(*_serverConfig);
        StorServerConfig& newC(*_newServerConfig);
        {
            updated = false;
            NodeStateUpdater::Lock::SP lock(_component->getStateUpdater().grabStateChangeLock());
            lib::NodeState ns(*_component->getStateUpdater().getReportedNodeState());
            if (DIFFER(nodeCapacity)) {
                LOG(info, "Live config update: Updating node capacity from %f to %f.",
                    oldC.nodeCapacity, newC.nodeCapacity);
                ASSIGN(nodeCapacity);
                ns.setCapacity(newC.nodeCapacity);
            }
            if (updated) {
                _serverConfig.reset(new vespa::config::content::core::StorServerConfig(oldC));
                _component->getStateUpdater().setReportedNodeState(ns);
            }
        }
    }
    StorageNode::handleLiveConfigUpdate(initGuard);
}

VisitorMessageSession::UP
ServiceLayerNode::createSession(Visitor& visitor, VisitorThread& thread)
{
    auto mbusSession = std::make_unique<MessageBusVisitorMessageSession>(visitor, thread);
    mbus::SourceSessionParams srcParams;
    srcParams.setThrottlePolicy(mbus::IThrottlePolicy::SP());
    srcParams.setReplyHandler(*mbusSession);
    mbusSession->setSourceSession(_communicationManager->getMessageBus().getMessageBus().createSourceSession(srcParams));
    return VisitorMessageSession::UP(std::move(mbusSession));
}

documentapi::Priority::Value
ServiceLayerNode::toDocumentPriority(uint8_t storagePriority) const
{
    return _communicationManager->getPriorityConverter().toDocumentPriority(storagePriority);
}

void
ServiceLayerNode::createChain(IStorageChainBuilder &builder)
{
    ServiceLayerComponentRegister& compReg(_context.getComponentRegister());

    auto communication_manager = std::make_unique<CommunicationManager>(compReg, _configUri);
    _communicationManager = communication_manager.get();
    builder.add(std::move(communication_manager));
    builder.add(std::make_unique<Bouncer>(compReg, _configUri));
    auto merge_throttler_up = std::make_unique<MergeThrottler>(_configUri, compReg);
    auto merge_throttler = merge_throttler_up.get();
    builder.add(std::move(merge_throttler_up));
    builder.add(std::make_unique<ChangedBucketOwnershipHandler>(_configUri, compReg));
    auto bucket_manager = std::make_unique<BucketManager>(_configUri, _context.getComponentRegister());
    _bucket_manager = bucket_manager.get();
    builder.add(std::move(bucket_manager));
    builder.add(std::make_unique<VisitorManager>(_configUri, _context.getComponentRegister(),
                                                 static_cast<VisitorMessageSessionFactory &>(*this), _externalVisitors));
    builder.add(std::make_unique<ModifiedBucketChecker>(_context.getComponentRegister(), _persistenceProvider, _configUri));
    auto state_manager = releaseStateManager();
    auto filstor_manager = std::make_unique<FileStorManager>(_configUri, _persistenceProvider, _context.getComponentRegister(),
                                                             getDoneInitializeHandler(), state_manager->getHostInfo());
    _fileStorManager = filstor_manager.get();
    builder.add(std::move(filstor_manager));
    builder.add(std::move(state_manager));

    // Lifetimes of all referenced components shall outlive the last call going
    // through the SPI, as queues are flushed and worker threads joined when
    // the storage link chain is closed prior to destruction.
    auto error_listener = std::make_shared<ServiceLayerErrorListener>(*_component, *merge_throttler);
    _fileStorManager->error_wrapper().register_error_listener(std::move(error_listener));
}

ResumeGuard
ServiceLayerNode::pause()
{
    return _fileStorManager->getFileStorHandler().pause();
}

void ServiceLayerNode::perform_post_chain_creation_init_steps() {
    assert(_fileStorManager);
    assert(_bucket_manager);
    // After initialization, the node will immediately start communicating with the cluster
    // controller, exchanging host info. This host info contains a subset snapshot of the active
    // metrics, which includes the total bucket count, doc count etc. It is critical that
    // we must never report back host info _prior_ to having run at least one full sweep of
    // the bucket database, lest we risk transiently reporting zero buckets held by the
    // content node. Doing so could cause orchestration logic to perform operations based
    // on erroneous assumptions.
    // To avoid this, we explicitly force a full DB sweep and metric update prior to reporting
    // the node as up. Since this function is called prior to the CommunicationManager thread
    // being started, any CC health pings should also always happen after this init step.
    _fileStorManager->initialize_bucket_databases_from_provider();
    _bucket_manager->force_db_sweep_and_metric_update();
    _fileStorManager->complete_internal_initialization();
}

} // storage