aboutsummaryrefslogtreecommitdiffstats
path: root/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp
blob: dae15992f28d9c360d2ea0cf4597d059bbe09738 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.

#include "translogserverapp.h"
#include <vespa/config/subscription/configuri.h>
#include <vespa/config/helper/configfetcher.hpp>
#include <vespa/vespalib/util/time.h>

#include <vespa/log/log.h>
LOG_SETUP(".translogserverapp");

using search::common::FileHeaderContext;

namespace search::transactionlog {

TransLogServerApp::TransLogServerApp(const config::ConfigUri & tlsConfigUri,
                                     const FileHeaderContext & fileHeaderContext)
    : _lock(),
      _tls(),
      _tlsConfig(),
      _tlsConfigFetcher(std::make_unique<config::ConfigFetcher>(tlsConfigUri.getContext())),
      _fileHeaderContext(fileHeaderContext)
{
    _tlsConfigFetcher->subscribe<searchlib::TranslogserverConfig>(tlsConfigUri.getConfigId(), this);
    _tlsConfigFetcher->start();
}

namespace {

Encoding::Crc
getCrc(searchlib::TranslogserverConfig::Crcmethod crcType)
{
    switch (crcType) {
        case searchlib::TranslogserverConfig::Crcmethod::ccitt_crc32:
            return Encoding::Crc::ccitt_crc32;
        case searchlib::TranslogserverConfig::Crcmethod::xxh64:
            return Encoding::Crc::xxh64;
    }
    assert(false);
}

Encoding::Compression
getCompression(searchlib::TranslogserverConfig::Compression::Type type)
{
    switch (type) {
        case searchlib::TranslogserverConfig::Compression::Type::NONE:
        case searchlib::TranslogserverConfig::Compression::Type::NONE_MULTI:
            return Encoding::Compression::none_multi;
        case searchlib::TranslogserverConfig::Compression::Type::LZ4:
            return Encoding::Compression::lz4;
        case searchlib::TranslogserverConfig::Compression::Type::ZSTD:
            return Encoding::Compression::zstd;
    }
    assert(false);
}

Encoding
getEncoding(const searchlib::TranslogserverConfig & cfg)
{
    return Encoding(getCrc(cfg.crcmethod), getCompression(cfg.compression.type));
}

DomainConfig
getDomainConfig(const searchlib::TranslogserverConfig & cfg) {
    DomainConfig dcfg;
    dcfg.setEncoding(getEncoding(cfg))
        .setCompressionLevel(cfg.compression.level)
        .setPartSizeLimit(cfg.filesizemax)
        .setChunkSizeLimit(cfg.chunk.sizelimit)
        .setFSyncOnCommit(cfg.usefsync);
    return dcfg;
}

void
logReconfig(const searchlib::TranslogserverConfig & cfg, const DomainConfig & dcfg) {
    LOG(config, "configure Transaction Log Server %s at port %d\n"
                "DomainConfig {encoding={%d, %d}, compression_level=%d, part_limit=%ld, chunk_limit=%ld}",
        cfg.servername.c_str(), cfg.listenport,
        dcfg.getEncoding().getCrc(), dcfg.getEncoding().getCompression(), dcfg.getCompressionlevel(),
        dcfg.getPartSizeLimit(), dcfg.getChunkSizeLimit());
}

size_t
derive_num_threads(uint32_t configured_cores, uint32_t actual_cores) {
    return (configured_cores > 0)
        ? configured_cores
        : std::max(1u, std::min(4u, actual_cores/8));
}

}

void
TransLogServerApp::start(FNET_Transport & transport, uint32_t num_cores)
{
    std::lock_guard<std::mutex> guard(_lock);
    auto c = _tlsConfig.get();
    DomainConfig domainConfig = getDomainConfig(*c);
    logReconfig(*c, domainConfig);
   _tls = std::make_shared<TransLogServer>(transport, c->servername, c->listenport, c->basedir, _fileHeaderContext,
                                            domainConfig, derive_num_threads(c->maxthreads, num_cores));
}

TransLogServerApp::~TransLogServerApp()
{
    _tlsConfigFetcher->close();
}

void
TransLogServerApp::configure(std::unique_ptr<searchlib::TranslogserverConfig> cfg)
{

    std::lock_guard<std::mutex> guard(_lock);
    DomainConfig dcfg = getDomainConfig(*cfg);
    logReconfig(*cfg, dcfg);
    _tlsConfig.set(cfg.release());
    _tlsConfig.latch();
    if (_tls) {
        _tls->setDomainConfig(dcfg);
    }
}

TransLogServer::SP
TransLogServerApp::getTransLogServer() const {
    std::lock_guard<std::mutex> guard(_lock);
    return _tls;
}

}