aboutsummaryrefslogtreecommitdiffstats
path: root/searchcore/src/apps/proton/proton.cpp
blob: de256ebf0d9b72d3717f9786fee18fbc964f3aa8 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.

#include <vespa/searchcore/proton/server/proton.h>
#include <vespa/storage/storageserver/storagenode.h>
#include <vespa/metrics/metricmanager.h>
#include <vespa/searchvisitor/searchvisitor.h>
#include <vespa/vespalib/util/signalhandler.h>
#include <vespa/vespalib/util/programoptions.h>
#include <vespa/vespalib/util/size_literals.h>
#include <vespa/vespalib/util/exceptions.h>
#include <vespa/config/common/exceptions.h>
#include <vespa/config/common/configcontext.h>
#include <vespa/fnet/transport.h>
#include <vespa/fastos/file.h>
#include <absl/debugging/failure_signal_handler.h>
#include <filesystem>
#include <iostream>
#include <thread>
#include <fcntl.h>

#include <vespa/log/log.h>
LOG_SETUP("proton");

using SIG = vespalib::SignalHandler;
using vespa::config::search::core::ProtonConfig;

struct Params
{
    std::string identity;
    std::string serviceidentity;
    uint64_t subscribeTimeout;
    Params();
    ~Params();
};

Params::Params()
    : identity(),
      serviceidentity(),
      subscribeTimeout(60)
{}
Params::~Params() = default;

class App
{
private:
    static void setupSignals();
    static void setup_fadvise();
    Params parseParams(int argc, char **argv);
    void startAndRun(FNET_Transport & transport, int argc, char **argv);
public:
    int main(int argc, char **argv);
};

void
App::setupSignals()
{
    absl::FailureSignalHandlerOptions opts;
    // Sanitizers set up their own signal handler, so we must ensure that the failure signal
    // handler calls this when it's done, or we won't get a proper report.
    opts.call_previous_handler = true;
    // Ideally we'd use an alternate stack to have well-defined reporting when a
    // thread runs out of stack space (infinite recursion bug etc.), but for some
    // reason this seems to negatively affect stack walking and give very incomplete
    // traces. So until this is resolved, use the thread's own stack.
    opts.use_alternate_stack = false;
    absl::InstallFailureSignalHandler(opts);

    // Install our own signal handlers _after_ the failure handler, as the sentinel uses
    // SIGTERM as a "friendly poke for shutdown" signal and the Abseil failure handler
    // always dumps stack when intercepting this signal (since it's considered fatal).
    SIG::PIPE.ignore();
    SIG::INT.hook();
    SIG::TERM.hook();
    SIG::enable_cross_thread_stack_tracing();
}

void
App::setup_fadvise()
{
#ifdef __linux__
    char * fadvise = getenv("VESPA_FADVISE_OPTIONS");
    if (fadvise != nullptr) {
        int fadviseOptions(0);
        if (strstr(fadvise, "SEQUENTIAL")) { fadviseOptions |= POSIX_FADV_SEQUENTIAL; }
        if (strstr(fadvise, "RANDOM"))     { fadviseOptions |= POSIX_FADV_RANDOM; }
        if (strstr(fadvise, "WILLNEED"))   { fadviseOptions |= POSIX_FADV_WILLNEED; }
        if (strstr(fadvise, "DONTNEED"))   { fadviseOptions |= POSIX_FADV_DONTNEED; }
        if (strstr(fadvise, "NOREUSE"))    { fadviseOptions |= POSIX_FADV_NOREUSE; }
        FastOS_FileInterface::setDefaultFAdviseOptions(fadviseOptions);
    }
#endif
}

Params
App::parseParams(int argc, char **argv)
{
    Params params;
    vespalib::ProgramOptions parser(argc, argv);
    parser.setSyntaxMessage("proton -- the nextgen search core");
    parser.addOption("identity", params.identity, "Node identity and config id");
    std::string empty;
    parser.addOption("serviceidentity", params.serviceidentity, empty, "Service node identity and config id");
    parser.addOption("subscribeTimeout", params.subscribeTimeout, UINT64_C(600000), "Initial config subscribe timeout");
    try {
        parser.parse();
    } catch (vespalib::InvalidCommandLineArgumentsException &e) {
        parser.writeSyntaxPage(std::cerr);
        throw;
    }
    return params;
}


using storage::spi::PersistenceProvider;

#include <vespa/storageserver/app/servicelayerprocess.h>

class ProtonServiceLayerProcess : public storage::ServiceLayerProcess {
    proton::Proton&         _proton;
    FNET_Transport&         _transport;
    vespalib::string        _file_distributor_connection_spec;
    metrics::MetricManager* _metricManager;
    std::weak_ptr<streaming::SearchVisitorFactory> _search_visitor_factory;

public:
    ProtonServiceLayerProcess(const config::ConfigUri & configUri,
                              proton::Proton & proton, FNET_Transport& transport,
                              const vespalib::string& file_distributor_connection_spec,
                              const vespalib::HwInfo& hw_info);
    ~ProtonServiceLayerProcess() override { shutdown(); }

    void shutdown() override;
    void setupProvider() override;
    storage::spi::PersistenceProvider& getProvider() override;

    void setMetricManager(metrics::MetricManager& mm) {
        // The service layer will call init(...) and stop() on the metric
        // manager provided. Current design is that rather than depending
        // on every component properly unregistering metrics and update
        // hooks, the service layer stops metric manager ahead of shutting
        // down component.
        _metricManager = &mm;
    }
    int64_t getGeneration() const override;
    void add_external_visitors() override;
};

ProtonServiceLayerProcess::ProtonServiceLayerProcess(const config::ConfigUri & configUri,
                                                     proton::Proton & proton, FNET_Transport& transport,
                                                     const vespalib::string& file_distributor_connection_spec,
                                                     const vespalib::HwInfo& hw_info)
    : ServiceLayerProcess(configUri, hw_info),
      _proton(proton),
      _transport(transport),
      _file_distributor_connection_spec(file_distributor_connection_spec),
      _metricManager(nullptr),
      _search_visitor_factory()
{
    setMetricManager(_proton.getMetricManager());
}

void
ProtonServiceLayerProcess::shutdown()
{
    ServiceLayerProcess::shutdown();
}

void
ProtonServiceLayerProcess::setupProvider()
{
    if (_metricManager != nullptr) {
        _context.getComponentRegister().setMetricManager(*_metricManager);
    }
}

storage::spi::PersistenceProvider &
ProtonServiceLayerProcess::getProvider()
{
    return _proton.getPersistence();
}

int64_t
ProtonServiceLayerProcess::getGeneration() const
{
    int64_t slGen = storage::ServiceLayerProcess::getGeneration();
    int64_t protonGen = _proton.getConfigGeneration();
    int64_t gen = std::min(slGen, protonGen);
    auto factory = _search_visitor_factory.lock();
    if (factory) {
        auto factory_gen = factory->get_oldest_config_generation();
        if (factory_gen.has_value()) {
            gen = std::min(gen, factory_gen.value());
        }
    }
    return gen;
}

void
ProtonServiceLayerProcess::add_external_visitors()
{
    auto factory = std::make_shared<streaming::SearchVisitorFactory>(_configUri, &_transport, _file_distributor_connection_spec);
    _search_visitor_factory = factory;
    _externalVisitors["searchvisitor"] = factory;
}

namespace {

class ExitOnSignal {
    std::atomic<bool> _stop;
    std::thread       _thread;
    
public:
    ExitOnSignal();
    ~ExitOnSignal();
    void operator()();
};

ExitOnSignal::ExitOnSignal()
    : _stop(false),
      _thread()
{
    _thread = std::thread(std::ref(*this));
}

ExitOnSignal::~ExitOnSignal()
{
    _stop.store(true, std::memory_order_relaxed);
    _thread.join();
}

void
ExitOnSignal::operator()()
{
    while (!_stop.load(std::memory_order_relaxed)) {
        if (SIG::INT.check() || SIG::TERM.check()) {
            EV_STOPPING("proton", "unclean shutdown after interrupted init");
            std::_Exit(0);
        }
        std::this_thread::sleep_for(100ms);
    }
}

fnet::TransportConfig
buildTransportConfig() {
    uint32_t numProcs = std::thread::hardware_concurrency();
    return fnet::TransportConfig(std::max(1u, std::min(4u, numProcs/8)));
}

class Transport {
public:
    Transport(const fnet::TransportConfig & config)
        : _transport(config)
    {
        _transport.Start();
    }
    ~Transport() {
        _transport.ShutDown(true);
    }
    FNET_Transport & transport() { return _transport; }
private:
    FNET_Transport _transport;
};

}

void
App::startAndRun(FNET_Transport & transport, int argc, char **argv) {
    Params params = parseParams(argc, argv);
    LOG(debug, "identity: '%s'", params.identity.c_str());
    LOG(debug, "serviceidentity: '%s'", params.serviceidentity.c_str());
    LOG(debug, "subscribeTimeout: '%" PRIu64 "'", params.subscribeTimeout);
    std::chrono::milliseconds subscribeTimeout(params.subscribeTimeout);

    config::ConfigServerSpec configServerSpec(transport);
    config::ConfigUri identityUri(params.identity, std::make_shared<config::ConfigContext>(configServerSpec));
    auto protonUP = std::make_unique<proton::Proton>(transport, identityUri,
                                                     (argc > 0) ? argv[0] : "proton", subscribeTimeout);
    proton::Proton & proton = *protonUP;
    proton::BootstrapConfig::SP configSnapshot = proton.init();
    if (proton.hasAbortedInit()) {
        EV_STOPPING("proton", "shutdown after aborted init");
    } else {
        const ProtonConfig &protonConfig = configSnapshot->getProtonConfig();
        vespalib::string basedir = protonConfig.basedir;
        std::filesystem::create_directories(std::filesystem::path(basedir));
        {
            ExitOnSignal exit_on_signal;
            proton.init(configSnapshot);
        }
        vespalib::string file_distributor_connection_spec = configSnapshot->getFiledistributorrpcConfig().connectionspec;
        std::unique_ptr<ProtonServiceLayerProcess> spiProton;

        if ( ! params.serviceidentity.empty()) {
            spiProton = std::make_unique<ProtonServiceLayerProcess>(identityUri.createWithNewId(params.serviceidentity), proton, transport,
                                                                    file_distributor_connection_spec, configSnapshot->getHwInfo());
            spiProton->setupConfig(subscribeTimeout);
            spiProton->createNode();
            EV_STARTED("servicelayer");
        } else {
            proton.getMetricManager().init(identityUri);
        }
        configSnapshot.reset();
        EV_STARTED("proton");
        while (!(SIG::INT.check() || SIG::TERM.check() || (spiProton && spiProton->getNode().attemptedStopped()))) {
            std::this_thread::sleep_for(1000ms);
            if (spiProton && spiProton->configUpdated()) {
                storage::ResumeGuard guard(spiProton->getNode().pause());
                spiProton->updateConfig();
            }
        }
        // Ensure metric manager and state server are shut down before we start tearing
        // down any service layer components that they may end up transitively using.
        protonUP->shutdown_config_fetching_and_state_exposing_components_once();
        if (spiProton) {
            spiProton->getNode().requestShutdown("controlled shutdown");
            spiProton->shutdown();
            EV_STOPPING("servicelayer", "clean shutdown");
        }
        protonUP.reset();
        EV_STOPPING("proton", "clean shutdown");
    }
}

int
App::main(int argc, char **argv)
{
    try {
        setupSignals();
        setup_fadvise();
        Transport transport(buildTransportConfig());
        startAndRun(transport.transport(), argc, argv);
    } catch (const vespalib::InvalidCommandLineArgumentsException &e) {
        LOG(warning, "Invalid commandline arguments: '%s'", e.what());
        return 1;
    } catch (const config::ConfigTimeoutException &e) {
        LOG(warning, "Error subscribing to initial config: '%s'", e.what());
        return 1;
    } catch (const vespalib::PortListenException &e) {
        LOG(warning, "Failed listening to a network port(%d) with protocol(%s): '%s'",
                   e.get_port(), e.get_protocol().c_str(), e.what());
        return 1;
    } catch (const vespalib::NetworkSetupFailureException & e) {
        LOG(warning, "Network failure: '%s'", e.what());
        return 1;
    } catch (const config::InvalidConfigException & e) {
        LOG(warning, "Invalid config failure: '%s'", e.what());
        return 1;
    } catch (const vespalib::IllegalStateException & e) {
        LOG(error, "Unknown IllegalStateException: '%s'", e.what());
        throw;
    }
    LOG(debug, "Fully stopped, all destructors run.)");
    return 0;
}

int main(int argc, char **argv) {
    App app;
    return app.main(argc, argv);
}