diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2017-10-23 15:19:58 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2018-01-10 10:01:02 +0100 |
commit | 64dcb17f1155fe2dfa507c309059e05be34a3f9f (patch) | |
tree | bd3b1fa3ca863799c5eaa216c04cc13775df901e /searchlib | |
parent | 746c52194d095a1e307a49c0695af4a5b71beebb (diff) |
Wire in the remaining config and make it not require restart.
Diffstat (limited to 'searchlib')
15 files changed, 188 insertions, 116 deletions
diff --git a/searchlib/src/tests/transactionlog/translogclient_test.cpp b/searchlib/src/tests/transactionlog/translogclient_test.cpp index 9589d43c762..5b28fdc6567 100644 --- a/searchlib/src/tests/transactionlog/translogclient_test.cpp +++ b/searchlib/src/tests/transactionlog/translogclient_test.cpp @@ -225,7 +225,7 @@ bool Test::partialUpdateTest() { bool retval(false); DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test7", 18377, ".", fileHeaderContext, 0x10000); + TransLogServer tlss("test7", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)); TransLogClient tls("tcp/localhost:18377"); TransLogClient::Session::UP s1 = openDomainTest(tls, "test1"); @@ -476,7 +476,7 @@ getMaxSessionRunTime(TransLogServer &tls, const vespalib::string &domain) bool Test::testVisitOverGeneratedDomain() { DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test7", 18377, ".", fileHeaderContext, 0x10000); + TransLogServer tlss("test7", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)); TransLogClient tls("tcp/localhost:18377"); vespalib::string name("test1"); @@ -494,7 +494,8 @@ bool Test::testVisitOverGeneratedDomain() void Test::createAndFillDomain(const vespalib::string & name, Encoding encoding, size_t preExistingDomains) { DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test13", 18377, ".", fileHeaderContext, 0x10000, 4, encoding); + TransLogServer tlss("test13", 18377, ".", fileHeaderContext, + DomainConfig().setPartSizeLimit(0x1000000).setEncoding(encoding), 4); TransLogClient tls("tcp/localhost:18377"); createDomainTest(tls, name, preExistingDomains); @@ -505,7 +506,7 @@ void Test::createAndFillDomain(const vespalib::string & name, Encoding encoding, void Test::verifyDomain(const vespalib::string & name) { DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test13", 18377, ".", fileHeaderContext, 0x10000); + TransLogServer tlss("test13", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000)); TransLogClient tls("tcp/localhost:18377"); TransLogClient::Session::UP s1 = openDomainTest(tls, name); visitDomainTest(tls, s1.get(), name); @@ -523,7 +524,7 @@ void Test::testCrcVersions() bool Test::testRemove() { DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("testremove", 18377, ".", fileHeaderContext, 0x10000); + TransLogServer tlss("testremove", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)); TransLogClient tls("tcp/localhost:18377"); vespalib::string name("test-delete"); @@ -540,7 +541,7 @@ bool Test::testVisitOverPreExistingDomain() { // Depends on Test::testVisitOverGeneratedDomain() DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test7", 18377, ".", fileHeaderContext, 0x10000); + TransLogServer tlss("test7", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)); TransLogClient tls("tcp/localhost:18377"); vespalib::string name("test1"); @@ -594,7 +595,7 @@ void Test::testMany() const unsigned int TOTAL_NUM_ENTRIES = NUM_PACKETS * NUM_ENTRIES; { DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test8", 18377, ".", fileHeaderContext, 0x80000); + TransLogServer tlss("test8", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x80000)); TransLogClient tls("tcp/localhost:18377"); createDomainTest(tls, "many", 0); @@ -617,7 +618,7 @@ void Test::testMany() } { DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test8", 18377, ".", fileHeaderContext, 0x1000000); + TransLogServer tlss("test8", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000)); TransLogClient tls("tcp/localhost:18377"); TransLogClient::Session::UP s1 = openDomainTest(tls, "many"); @@ -645,7 +646,7 @@ void Test::testErase() const unsigned int TOTAL_NUM_ENTRIES = NUM_PACKETS * NUM_ENTRIES; { DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test12", 18377, ".", fileHeaderContext, 0x80000); + TransLogServer tlss("test12", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x80000)); TransLogClient tls("tcp/localhost:18377"); createDomainTest(tls, "erase", 0); @@ -654,7 +655,7 @@ void Test::testErase() } { DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test12", 18377, ".", fileHeaderContext, 0x1000000); + TransLogServer tlss("test12", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000)); TransLogClient tls("tcp/localhost:18377"); TransLogClient::Session::UP s1 = openDomainTest(tls, "erase"); @@ -744,7 +745,7 @@ Test::testSync() const unsigned int TOTAL_NUM_ENTRIES = NUM_PACKETS * NUM_ENTRIES; DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test9", 18377, ".", fileHeaderContext, 0x1000000); + TransLogServer tlss("test9", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000)); TransLogClient tls("tcp/localhost:18377"); createDomainTest(tls, "sync", 0); @@ -769,7 +770,7 @@ Test::testTruncateOnVersionMismatch() size_t countOld(0); DummyFileHeaderContext fileHeaderContext; { - TransLogServer tlss("test11", 18377, ".", fileHeaderContext, 0x1000000); + TransLogServer tlss("test11", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000)); TransLogClient tls("tcp/localhost:18377"); createDomainTest(tls, "sync", 0); @@ -790,7 +791,7 @@ Test::testTruncateOnVersionMismatch() EXPECT_EQUAL(static_cast<ssize_t>(sizeof(tmp)), f.Write2(tmp, sizeof(tmp))); EXPECT_TRUE(f.Close()); { - TransLogServer tlss("test11", 18377, ".", fileHeaderContext, 0x1000000); + TransLogServer tlss("test11", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)); TransLogClient tls("tcp/localhost:18377"); TransLogClient::Session::UP s1 = openDomainTest(tls, "sync"); uint64_t from(0), to(0); @@ -816,7 +817,7 @@ Test::testTruncateOnShortRead() DummyFileHeaderContext fileHeaderContext; { - TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, 0x10000); + TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)); TransLogClient tls(tlsspec); createDomainTest(tls, domain, 0); @@ -832,7 +833,7 @@ Test::testTruncateOnShortRead() EXPECT_EQUAL(2u, countFiles(dir)); } { - TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, 0x10000); + TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)); TransLogClient tls(tlsspec); TransLogClient::Session::UP s1 = openDomainTest(tls, domain); checkFilledDomainTest(s1, TOTAL_NUM_ENTRIES); @@ -848,7 +849,7 @@ Test::testTruncateOnShortRead() trfile.Close(); } { - TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, 0x10000); + TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)); TransLogClient tls(tlsspec); TransLogClient::Session::UP s1 = openDomainTest(tls, domain); checkFilledDomainTest(s1, TOTAL_NUM_ENTRIES - 1); diff --git a/searchlib/src/tests/transactionlogstress/translogstress.cpp b/searchlib/src/tests/transactionlogstress/translogstress.cpp index 62ef49ff689..6f2581d3799 100644 --- a/searchlib/src/tests/transactionlogstress/translogstress.cpp +++ b/searchlib/src/tests/transactionlogstress/translogstress.cpp @@ -698,7 +698,7 @@ TransLogStress::Main() // start transaction log server DummyFileHeaderContext fileHeaderContext; - TransLogServer tls("server", 17897, ".", fileHeaderContext, _cfg.domainPartSize); + TransLogServer tls("server", 17897, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(_cfg.domainPartSize)); TransLogClient client(tlsSpec); client.create(domain); diff --git a/searchlib/src/vespa/searchlib/config/translogserver.def b/searchlib/src/vespa/searchlib/config/translogserver.def index fa4b7ef3f3e..b1bdb3765b1 100644 --- a/searchlib/src/vespa/searchlib/config/translogserver.def +++ b/searchlib/src/vespa/searchlib/config/translogserver.def @@ -5,7 +5,7 @@ namespace=searchlib listenport int default=13700 restart ## Max file size (50M) -filesizemax int default=50000000 restart +filesizemax int default=50000000 ## Server name to identify server. servername string default="tls" restart @@ -30,3 +30,9 @@ compression.type enum {NONE, LZ4, ZSTD} default=LZ4 ## LZ4 has normal range 1..9 while ZSTD has range 1..19 ## 9 is a reasonable default for both compression.level int default=9 + +## How large a chunk can grow in memory before beeing flushed +chunk.sizelimit int default = 256000 # 256k + +## How long a chunk can reside in memory befor ebeeing flushed to disk. +chunk.agelimit double default = 0.0010 # 10 milliseconds diff --git a/searchlib/src/vespa/searchlib/transactionlog/chunks.cpp b/searchlib/src/vespa/searchlib/transactionlog/chunks.cpp index e4e98e60d3f..dea0ccf9b9a 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/chunks.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/chunks.cpp @@ -95,9 +95,9 @@ XXH64Compressed::decompress(nbostream & is) { is.adjustReadPos(is.size()); } -XXH64Compressed::XXH64Compressed(CompressionConfig::Type type) +XXH64Compressed::XXH64Compressed(CompressionConfig::Type type, uint8_t level) : _type(type), - _level(9) + _level(level) { } Encoding diff --git a/searchlib/src/vespa/searchlib/transactionlog/chunks.h b/searchlib/src/vespa/searchlib/transactionlog/chunks.h index 85696b68245..cf88bc0a3ed 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/chunks.h +++ b/searchlib/src/vespa/searchlib/transactionlog/chunks.h @@ -24,8 +24,7 @@ public: class XXH64Compressed : public IChunk { public: using CompressionConfig = vespalib::compression::CompressionConfig; - XXH64Compressed(CompressionConfig::Type); - void setLevel(uint8_t level) { _level = level; } + XXH64Compressed(CompressionConfig::Type, uint8_t level); protected: void decompress(nbostream & os); Encoding compress(nbostream & os, Encoding::Crc crc) const; diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp index c3b2738eac1..31ad57de1b6 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp @@ -21,35 +21,39 @@ using search::common::FileHeaderContext; using std::runtime_error; using namespace std::chrono_literals; using namespace std::chrono; +using std::make_shared; namespace search::transactionlog { +DomainConfig::DomainConfig() + : _encoding(Encoding::Crc::xxh64, Encoding::Compression::none), + _compressionLevel(9), + _partSizeLimit(0x10000000), // 256M + _chunkSizeLimit(0x40000), // 256k + _chunkAgeLimit(1ms) +{ } Domain::Domain(const string &domainName, const string & baseDir, FastOS_ThreadPool & threadPool, - Executor & commitExecutor, Executor & sessionExecutor, uint64_t domainPartSize, - Encoding defaultEncoding, const FileHeaderContext &fileHeaderContext) : - _currentChunk(std::make_unique<Chunk>()), - _lastSerial(0), - _defaultEncoding(defaultEncoding), - _threadPool(threadPool), - _commitExecutor(commitExecutor), - _sessionExecutor(sessionExecutor), - _sessionId(1), - _syncMonitor(), - _pendingSync(false), - _name(domainName), - _domainPartSizeLimit(domainPartSize), - _chunkSizeLimit(0x40000), - _chunkAgeLimit(1ms), - _parts(), - _lock(), - _currentChunkMonitor(), - _sessionLock(), - _sessions(), - _maxSessionRunTime(), - _baseDir(baseDir), - _fileHeaderContext(fileHeaderContext), - _markedDeleted(false), - _self(nullptr) + Executor & commitExecutor, Executor & sessionExecutor, const DomainConfig & cfg, + const FileHeaderContext &fileHeaderContext) + : _config(cfg), + _currentChunk(std::make_unique<Chunk>()), + _lastSerial(0), + _threadPool(threadPool), + _commitExecutor(commitExecutor), + _sessionExecutor(sessionExecutor), + _sessionId(1), + _syncMonitor(), + _pendingSync(false), + _name(domainName), + _parts(), + _lock(), + _currentChunkMonitor(), + _sessionLock(), + _sessions(), + _baseDir(baseDir), + _fileHeaderContext(fileHeaderContext), + _markedDeleted(false), + _self(nullptr) { int retval(0); if ((retval = makeDirectory(_baseDir.c_str())) != 0) { @@ -67,24 +71,32 @@ Domain::Domain(const string &domainName, const string & baseDir, FastOS_ThreadPo } _sessionExecutor.sync(); if (_parts.empty() || _parts.crbegin()->second->isClosed()) { - _parts[lastPart].reset(new DomainPart(_name, dir(), lastPart, _defaultEncoding, _fileHeaderContext, false)); + _parts[lastPart] = make_shared<DomainPart>(_name, dir(), lastPart, _config.getEncoding(), + _config.getCompressionlevel(), _fileHeaderContext, false); } _lastSerial = end(); _self = _threadPool.NewThread(this); assert(_self); } +Domain & +Domain::setConfig(const DomainConfig & cfg) { + _config = cfg; + return *this; +} + void Domain::Run(FastOS_ThreadInterface *thisThread, void *) { while (!thisThread->GetBreakFlag()) { vespalib::MonitorGuard guard(_currentChunkMonitor); - guard.wait(duration_cast<milliseconds>(_chunkAgeLimit).count()); + guard.wait(duration_cast<milliseconds>(_config.getChunkAgeLimit()).count()); commitIfStale(guard); } } void Domain::addPart(int64_t partId, bool isLastPart) { - DomainPart::SP dp(new DomainPart(_name, dir(), partId, _defaultEncoding, _fileHeaderContext, isLastPart)); + auto dp = make_shared<DomainPart>(_name, dir(), partId, _config.getEncoding(), + _config.getCompressionlevel(), _fileHeaderContext, isLastPart); if (dp->size() == 0) { // Only last domain part is allowed to be truncated down to // empty size. @@ -343,7 +355,7 @@ Domain::commit(const Packet & packet, Writer::DoneCallback onDone) { _lastSerial = packet.range().to(); } _currentChunk->add(packet, std::move(onDone)); - if (_currentChunk->sizeBytes() > _chunkSizeLimit) { + if (_currentChunk->sizeBytes() > _config.getChunkSizeLimit()) { completed = grabCurrentChunk(guard); } if (completed) { @@ -362,7 +374,7 @@ Domain::grabCurrentChunk(const vespalib::MonitorGuard & guard) { void Domain::commitIfStale(const vespalib::MonitorGuard & guard) { assert(guard.monitors(_currentChunkMonitor)); - if (_currentChunk->age() > _chunkAgeLimit) { + if (_currentChunk->age() > _config.getChunkAgeLimit()) { commitChunk(grabCurrentChunk(guard), guard); } } @@ -376,12 +388,13 @@ Domain::commitChunk(std::unique_ptr<Chunk> chunk, const vespalib::MonitorGuard & vespalib::nbostream_longlivedbuf is(packet.getHandle().c_str(), packet.getHandle().size()); Packet::Entry entry; entry.deserialize(is); - if (dp->byteSize() > _domainPartSizeLimit) { + if (dp->byteSize() > _config.getPartSizeLimit()) { waitPendingSync(_syncMonitor, _pendingSync); triggerSyncNow(); waitPendingSync(_syncMonitor, _pendingSync); dp->close(); - dp.reset(new DomainPart(_name, dir(), entry.serial(), _defaultEncoding, _fileHeaderContext, false)); + dp = make_shared<DomainPart>(_name, dir(), entry.serial(), _config.getEncoding(), + _config.getCompressionlevel(), _fileHeaderContext, false); { LockGuard guard(_lock); _parts[entry.serial()] = dp; diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h index c07459b0da9..9165e22f611 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.h +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h @@ -8,16 +8,39 @@ namespace search::transactionlog { +class DomainConfig { +public: + using microseconds = std::chrono::microseconds; + DomainConfig(); + DomainConfig & setEncoding(Encoding v) { _encoding = v; return *this; } + DomainConfig & setPartSizeLimit(size_t v) { _partSizeLimit = v; return *this; } + DomainConfig & setChunkSizeLimit(size_t v) { _chunkSizeLimit = v; return *this; } + DomainConfig & setChunkAgeLimit(microseconds v) { _chunkAgeLimit = v; return *this; } + DomainConfig & setCompressionLevel(uint8_t v) { _compressionLevel = v; return *this; } + Encoding getEncoding() const { return _encoding; } + size_t getPartSizeLimit() const { return _partSizeLimit; } + size_t getChunkSizeLimit() const { return _chunkSizeLimit; } + microseconds getChunkAgeLimit() const { return _chunkAgeLimit; } + uint8_t getCompressionlevel() const { return _compressionLevel; } +private: + Encoding _encoding; + uint8_t _compressionLevel; + size_t _partSizeLimit; + size_t _chunkSizeLimit; + microseconds _chunkAgeLimit; +}; + struct PartInfo { SerialNumRange range; size_t numEntries; size_t byteSize; vespalib::string file; - PartInfo(SerialNumRange range_in, size_t numEntries_in, - size_t byteSize_in, - vespalib::stringref file_in) - : range(range_in), numEntries(numEntries_in), byteSize(byteSize_in), - file(file_in) {} + PartInfo(SerialNumRange range_in, size_t numEntries_in, size_t byteSize_in, vespalib::stringref file_in) + : range(range_in), + numEntries(numEntries_in), + byteSize(byteSize_in), + file(file_in) + {} }; struct DomainInfo { @@ -41,8 +64,8 @@ public: using SP = std::shared_ptr<Domain>; using Executor = vespalib::ThreadExecutor; Domain(const vespalib::string &name, const vespalib::string &baseDir, FastOS_ThreadPool & threadPool, - Executor & commitExecutor, Executor & sessionExecutor, uint64_t domainPartSize, - Encoding defaultEncoding, const common::FileHeaderContext &fileHeaderContext); + Executor & commitExecutor, Executor & sessionExecutor, const DomainConfig & cfg, + const common::FileHeaderContext &fileHeaderContext); ~Domain() override; @@ -77,7 +100,7 @@ public: return _sessionExecutor.execute(std::move(task)); } uint64_t size() const; - + Domain & setConfig(const DomainConfig & cfg); private: void Run(FastOS_ThreadInterface *thisThread, void *arguments) override; void commitIfStale(const vespalib::MonitorGuard & guard); @@ -113,28 +136,24 @@ private: using DomainPartList = std::map<int64_t, DomainPart::SP>; using DurationSeconds = std::chrono::duration<double>; + DomainConfig _config; std::unique_ptr<Chunk> _currentChunk; - SerialNum _lastSerial; - Encoding _defaultEncoding; - FastOS_ThreadPool & _threadPool; - Executor & _commitExecutor; - Executor & _sessionExecutor; - std::atomic<int> _sessionId; - vespalib::Monitor _syncMonitor; - bool _pendingSync; - vespalib::string _name; - const uint64_t _domainPartSizeLimit; - const uint64_t _chunkSizeLimit; - const std::chrono::microseconds _chunkAgeLimit; - DomainPartList _parts; - vespalib::Lock _lock; - vespalib::Monitor _currentChunkMonitor; - vespalib::Lock _sessionLock; - SessionList _sessions; - DurationSeconds _maxSessionRunTime; - vespalib::string _baseDir; + SerialNum _lastSerial; + FastOS_ThreadPool & _threadPool; + Executor & _commitExecutor; + Executor & _sessionExecutor; + std::atomic<int> _sessionId; + vespalib::Monitor _syncMonitor; + bool _pendingSync; + vespalib::string _name; + DomainPartList _parts; + vespalib::Lock _lock; + vespalib::Monitor _currentChunkMonitor; + vespalib::Lock _sessionLock; + SessionList _sessions; + vespalib::string _baseDir; const common::FileHeaderContext &_fileHeaderContext; - bool _markedDeleted; + bool _markedDeleted; FastOS_ThreadInterface * _self; }; diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp index 4724f896962..756f54ad2c0 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp @@ -251,9 +251,10 @@ DomainPart::buildPacketMapping(bool allowTruncate) return currPos; } -DomainPart::DomainPart(const string & name, const string & baseDir, SerialNum s, Encoding defaultEncoding, - const FileHeaderContext &fileHeaderContext, bool allowTruncate) : - _defaultEncoding(defaultEncoding), +DomainPart::DomainPart(const string & name, const string & baseDir, SerialNum s, Encoding encoding, + uint8_t compressionLevel, const FileHeaderContext &fileHeaderContext, bool allowTruncate) : + _encoding(encoding), + _compressionLevel(compressionLevel), _lock(), _fileLock(), _range(s), @@ -397,7 +398,7 @@ DomainPart::commit(SerialNum firstSerial, const Packet &packet) //LOG(spam, //"Pos(%d) Len(%d), Lim(%d), Remaining(%d)", //h.getPos(), h.getLength(), h.getLimit(), h.getRemaining()); - IChunk::UP chunk = IChunk::create(_defaultEncoding); + IChunk::UP chunk = IChunk::create(_encoding, _compressionLevel); Packet::Entry entry; entry.deserialize(h); if (_range.to() < entry.serial()) { @@ -535,7 +536,7 @@ DomainPart::write(FastOS_FileInterface &file, const IChunk & chunk) { nbostream os; size_t begin = os.wp(); - os << _defaultEncoding.getRaw(); + os << _encoding.getRaw(); os << uint32_t(0); Encoding realEncoding = chunk.encode(os); size_t end = os.wp(); diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainpart.h b/searchlib/src/vespa/searchlib/transactionlog/domainpart.h index 9f92c49eaf5..5256b731125 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domainpart.h +++ b/searchlib/src/vespa/searchlib/transactionlog/domainpart.h @@ -22,7 +22,7 @@ private: public: typedef std::shared_ptr<DomainPart> SP; DomainPart(const vespalib::string &name, const vespalib::string &baseDir, SerialNum s, Encoding defaultEncoding, - const common::FileHeaderContext &FileHeaderContext, bool allowTruncate); + uint8_t compressionLevel, const common::FileHeaderContext &FileHeaderContext, bool allowTruncate); ~DomainPart(); @@ -74,7 +74,8 @@ private: }; typedef std::vector<SkipInfo> SkipList; typedef std::map<SerialNum, Packet> PacketList; - const Encoding _defaultEncoding; + const Encoding _encoding; + const uint8_t _compressionLevel; vespalib::Lock _lock; vespalib::Lock _fileLock; SerialNumRange _range; diff --git a/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp b/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp index e8a6afc106b..af7d396c0e4 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp @@ -63,21 +63,21 @@ IChunk::decode(nbostream & is) { IChunk::UP IChunk::create(uint8_t chunkType) { - return create(Encoding(chunkType)); + return create(Encoding(chunkType), 9); } IChunk::UP -IChunk::create(Encoding encoding) { +IChunk::create(Encoding encoding, uint8_t compressionLevel) { switch (encoding.getCrc()) { case Encoding::Crc::xxh64: switch (encoding.getCompression()) { case Encoding::Compression::none: return make_unique<XXH64None>(); case Encoding::Compression::lz4: - return make_unique<XXH64Compressed>(CompressionConfig::LZ4); + return make_unique<XXH64Compressed>(CompressionConfig::LZ4, compressionLevel); case Encoding::Compression::zstd: - return make_unique<XXH64Compressed>(CompressionConfig::ZSTD); + return make_unique<XXH64Compressed>(CompressionConfig::ZSTD, compressionLevel); default: - return make_unique<XXH64Compressed>(CompressionConfig::LZ4); + return make_unique<XXH64Compressed>(CompressionConfig::LZ4, compressionLevel); } case Encoding::Crc::ccitt_crc32: switch (encoding.getCompression()) { diff --git a/searchlib/src/vespa/searchlib/transactionlog/ichunk.h b/searchlib/src/vespa/searchlib/transactionlog/ichunk.h index 9186b3b12ff..0e913703468 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/ichunk.h +++ b/searchlib/src/vespa/searchlib/transactionlog/ichunk.h @@ -43,7 +43,7 @@ public: Encoding encode(nbostream & os) const; void decode(nbostream & buf); static UP create(uint8_t chunkType); - static UP create(Encoding chunkType); + static UP create(Encoding chunkType, uint8_t compressionLevel); SerialNumRange range() const; protected: virtual Encoding onEncode(nbostream & os) const = 0; diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp index db5b0cd0607..f724f5035c8 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp @@ -16,6 +16,7 @@ using vespalib::IllegalArgumentException; using search::common::FileHeaderContext; using std::make_shared; using std::runtime_error; +using namespace std::chrono_literals; namespace search::transactionlog { @@ -73,22 +74,22 @@ SyncHandler::PerformTask() TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir, const FileHeaderContext &fileHeaderContext) - : TransLogServer(name, listenPort, baseDir, fileHeaderContext, 0x10000000) + : TransLogServer(name, listenPort, baseDir, fileHeaderContext, + DomainConfig().setEncoding(Encoding(Encoding::xxh64, Encoding::Compression::none)) + .setPartSizeLimit(0x10000000).setChunkSizeLimit(0x40000).setChunkAgeLimit( 1ms)) {} TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir, - const FileHeaderContext &fileHeaderContext, uint64_t domainPartSize) - : TransLogServer(name, listenPort, baseDir, fileHeaderContext, domainPartSize, 4, Encoding::xxh64) + const FileHeaderContext &fileHeaderContext, const DomainConfig & cfg) + : TransLogServer(name, listenPort, baseDir, fileHeaderContext, cfg, 4) {} TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir, - const FileHeaderContext &fileHeaderContext, uint64_t domainPartSize, - size_t maxThreads, Encoding defaultCrcType) + const FileHeaderContext &fileHeaderContext, const DomainConfig & cfg, size_t maxThreads) : FRT_Invokable(), _name(name), _baseDir(baseDir), - _domainPartSize(domainPartSize), - _defaultEncoding(defaultCrcType), + _domainConfig(cfg), _commitExecutor(maxThreads, 128*1024), _sessionExecutor(maxThreads, 128*1024), _threadPool(0x20000), @@ -106,8 +107,8 @@ TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, con domainDir >> domainName; if ( ! domainName.empty()) { try { - auto domain = make_shared<Domain>(domainName, dir(), _threadPool, _commitExecutor, _sessionExecutor, - _domainPartSize, _defaultEncoding,_fileHeaderContext); + auto domain = make_shared<Domain>(domainName, dir(), _threadPool, _commitExecutor, + _sessionExecutor, cfg,_fileHeaderContext); _domains[domain->name()] = domain; } catch (const std::exception & e) { LOG(warning, "Failed creating %s domain on startup. Exception = %s", domainName.c_str(), e.what()); @@ -211,6 +212,17 @@ void TransLogServer::logMetric() const } } + +TransLogServer & +TransLogServer::setDomainConfig(const DomainConfig & cfg) { + Guard domainGuard(_lock); + _domainConfig = cfg; + for(auto &domain: _domains) { + domain.second->setConfig(cfg); + } + return *this; +} + DomainStats TransLogServer::getDomainStats() const { @@ -345,8 +357,8 @@ void TransLogServer::createDomain(FRT_RPCRequest *req) Domain::SP domain(findDomain(domainName)); if ( !domain ) { try { - domain = make_shared<Domain>(domainName, dir(), _threadPool, _commitExecutor, _sessionExecutor, - _domainPartSize, _defaultEncoding, _fileHeaderContext); + domain = make_shared<Domain>(domainName, dir(), _threadPool, _commitExecutor, + _sessionExecutor, _domainConfig, _fileHeaderContext); { Guard domainGuard(_lock); _domains[domain->name()] = domain; diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h index 225412cdaf6..83627d2d3e3 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h @@ -22,16 +22,15 @@ public: typedef std::shared_ptr<TransLogServer> SP; TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir, - const common::FileHeaderContext &fileHeaderContext, - uint64_t domainPartSize, size_t maxThreads, Encoding defaultCrc); + const common::FileHeaderContext &fileHeaderContext, const DomainConfig & cfg, size_t maxThreads); TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir, - const common::FileHeaderContext &fileHeaderContext, uint64_t domainPartSize); + const common::FileHeaderContext &fileHeaderContext, const DomainConfig & cfg); TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir, const common::FileHeaderContext &fileHeaderContext); ~TransLogServer() override; DomainStats getDomainStats() const; - void commit(const vespalib::string & domainName, const Packet & packet, DoneCallback done) override; + TransLogServer & setDomainConfig(const DomainConfig & cfg); class Session { @@ -79,8 +78,7 @@ private: vespalib::string _name; vespalib::string _baseDir; - const uint64_t _domainPartSize; - const Encoding _defaultEncoding; + DomainConfig _domainConfig; vespalib::ThreadStackExecutor _commitExecutor; vespalib::ThreadStackExecutor _sessionExecutor; FastOS_ThreadPool _threadPool; diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp index 60447e0c098..de3d72331d6 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp @@ -7,12 +7,17 @@ LOG_SETUP(".translogserverapp"); using search::common::FileHeaderContext; +using namespace std::chrono_literals; namespace search::transactionlog { +using LockGuard = std::lock_guard<std::mutex>; +using std::make_unique; + TransLogServerApp::TransLogServerApp(const config::ConfigUri & tlsConfigUri, const FileHeaderContext & fileHeaderContext) - : _tls(), + : _lock(), + _tls(), _tlsConfig(), _tlsConfigFetcher(tlsConfigUri.getContext()), _fileHeaderContext(fileHeaderContext) @@ -56,13 +61,25 @@ getEncoding(const searchlib::TranslogserverConfig & cfg) return Encoding(getCrc(cfg.crcmethod), getCompression(cfg.compression.type)); } +DomainConfig +getDomainConfig(const searchlib::TranslogserverConfig & cfg) { + DomainConfig dcfg; + dcfg.setEncoding(getEncoding(cfg)) + .setCompressionLevel(cfg.compression.level) + .setPartSizeLimit(cfg.filesizemax) + .setChunkSizeLimit(cfg.chunk.sizelimit) + .setChunkAgeLimit(std::chrono::microseconds(int64_t(cfg.chunk.agelimit*1000000))); + return dcfg; +} + } void TransLogServerApp::start() { - std::shared_ptr<searchlib::TranslogserverConfig> c = _tlsConfig.get(); - _tls.reset(new TransLogServer(c->servername, c->listenport, c->basedir, _fileHeaderContext, - c->filesizemax, c->maxthreads, getEncoding(*c))); + LockGuard guard(_lock); + auto c = _tlsConfig.get(); + _tls = make_unique<TransLogServer>(c->servername, c->listenport, c->basedir, _fileHeaderContext, + getDomainConfig(*c), c->maxthreads); } TransLogServerApp::~TransLogServerApp() @@ -73,8 +90,12 @@ TransLogServerApp::~TransLogServerApp() void TransLogServerApp::configure(std::unique_ptr<searchlib::TranslogserverConfig> cfg) { LOG(config, "configure Transaction Log Server %s at port %d", cfg->servername.c_str(), cfg->listenport); + LockGuard guard(_lock); _tlsConfig.set(cfg.release()); _tlsConfig.latch(); + if (_tls) { + _tls->setDomainConfig(getDomainConfig(*cfg)); + } } } diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.h b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.h index 35fa994d1e4..ea6d0158cec 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.h +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.h @@ -14,6 +14,7 @@ namespace search::transactionlog { class TransLogServerApp : public config::IFetcherCallback<searchlib::TranslogserverConfig> { private: + std::mutex _lock; TransLogServer::SP _tls; vespalib::PtrHolder<searchlib::TranslogserverConfig> _tlsConfig; config::ConfigFetcher _tlsConfigFetcher; |