aboutsummaryrefslogtreecommitdiffstats
path: root/searchlib/src
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-10-23 15:19:58 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2018-01-10 10:01:02 +0100
commit64dcb17f1155fe2dfa507c309059e05be34a3f9f (patch)
treebd3b1fa3ca863799c5eaa216c04cc13775df901e /searchlib/src
parent746c52194d095a1e307a49c0695af4a5b71beebb (diff)
Wire in the remaining config and make it not require restart.
Diffstat (limited to 'searchlib/src')
-rw-r--r--searchlib/src/tests/transactionlog/translogclient_test.cpp33
-rw-r--r--searchlib/src/tests/transactionlogstress/translogstress.cpp2
-rw-r--r--searchlib/src/vespa/searchlib/config/translogserver.def8
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/chunks.cpp4
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/chunks.h3
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domain.cpp77
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domain.h75
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp11
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domainpart.h5
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp10
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/ichunk.h2
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp34
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogserver.h10
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp29
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogserverapp.h1
15 files changed, 188 insertions, 116 deletions
diff --git a/searchlib/src/tests/transactionlog/translogclient_test.cpp b/searchlib/src/tests/transactionlog/translogclient_test.cpp
index 9589d43c762..5b28fdc6567 100644
--- a/searchlib/src/tests/transactionlog/translogclient_test.cpp
+++ b/searchlib/src/tests/transactionlog/translogclient_test.cpp
@@ -225,7 +225,7 @@ bool Test::partialUpdateTest()
{
bool retval(false);
DummyFileHeaderContext fileHeaderContext;
- TransLogServer tlss("test7", 18377, ".", fileHeaderContext, 0x10000);
+ TransLogServer tlss("test7", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000));
TransLogClient tls("tcp/localhost:18377");
TransLogClient::Session::UP s1 = openDomainTest(tls, "test1");
@@ -476,7 +476,7 @@ getMaxSessionRunTime(TransLogServer &tls, const vespalib::string &domain)
bool Test::testVisitOverGeneratedDomain()
{
DummyFileHeaderContext fileHeaderContext;
- TransLogServer tlss("test7", 18377, ".", fileHeaderContext, 0x10000);
+ TransLogServer tlss("test7", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000));
TransLogClient tls("tcp/localhost:18377");
vespalib::string name("test1");
@@ -494,7 +494,8 @@ bool Test::testVisitOverGeneratedDomain()
void Test::createAndFillDomain(const vespalib::string & name, Encoding encoding, size_t preExistingDomains)
{
DummyFileHeaderContext fileHeaderContext;
- TransLogServer tlss("test13", 18377, ".", fileHeaderContext, 0x10000, 4, encoding);
+ TransLogServer tlss("test13", 18377, ".", fileHeaderContext,
+ DomainConfig().setPartSizeLimit(0x1000000).setEncoding(encoding), 4);
TransLogClient tls("tcp/localhost:18377");
createDomainTest(tls, name, preExistingDomains);
@@ -505,7 +506,7 @@ void Test::createAndFillDomain(const vespalib::string & name, Encoding encoding,
void Test::verifyDomain(const vespalib::string & name)
{
DummyFileHeaderContext fileHeaderContext;
- TransLogServer tlss("test13", 18377, ".", fileHeaderContext, 0x10000);
+ TransLogServer tlss("test13", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000));
TransLogClient tls("tcp/localhost:18377");
TransLogClient::Session::UP s1 = openDomainTest(tls, name);
visitDomainTest(tls, s1.get(), name);
@@ -523,7 +524,7 @@ void Test::testCrcVersions()
bool Test::testRemove()
{
DummyFileHeaderContext fileHeaderContext;
- TransLogServer tlss("testremove", 18377, ".", fileHeaderContext, 0x10000);
+ TransLogServer tlss("testremove", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000));
TransLogClient tls("tcp/localhost:18377");
vespalib::string name("test-delete");
@@ -540,7 +541,7 @@ bool Test::testVisitOverPreExistingDomain()
{
// Depends on Test::testVisitOverGeneratedDomain()
DummyFileHeaderContext fileHeaderContext;
- TransLogServer tlss("test7", 18377, ".", fileHeaderContext, 0x10000);
+ TransLogServer tlss("test7", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000));
TransLogClient tls("tcp/localhost:18377");
vespalib::string name("test1");
@@ -594,7 +595,7 @@ void Test::testMany()
const unsigned int TOTAL_NUM_ENTRIES = NUM_PACKETS * NUM_ENTRIES;
{
DummyFileHeaderContext fileHeaderContext;
- TransLogServer tlss("test8", 18377, ".", fileHeaderContext, 0x80000);
+ TransLogServer tlss("test8", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x80000));
TransLogClient tls("tcp/localhost:18377");
createDomainTest(tls, "many", 0);
@@ -617,7 +618,7 @@ void Test::testMany()
}
{
DummyFileHeaderContext fileHeaderContext;
- TransLogServer tlss("test8", 18377, ".", fileHeaderContext, 0x1000000);
+ TransLogServer tlss("test8", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000));
TransLogClient tls("tcp/localhost:18377");
TransLogClient::Session::UP s1 = openDomainTest(tls, "many");
@@ -645,7 +646,7 @@ void Test::testErase()
const unsigned int TOTAL_NUM_ENTRIES = NUM_PACKETS * NUM_ENTRIES;
{
DummyFileHeaderContext fileHeaderContext;
- TransLogServer tlss("test12", 18377, ".", fileHeaderContext, 0x80000);
+ TransLogServer tlss("test12", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x80000));
TransLogClient tls("tcp/localhost:18377");
createDomainTest(tls, "erase", 0);
@@ -654,7 +655,7 @@ void Test::testErase()
}
{
DummyFileHeaderContext fileHeaderContext;
- TransLogServer tlss("test12", 18377, ".", fileHeaderContext, 0x1000000);
+ TransLogServer tlss("test12", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000));
TransLogClient tls("tcp/localhost:18377");
TransLogClient::Session::UP s1 = openDomainTest(tls, "erase");
@@ -744,7 +745,7 @@ Test::testSync()
const unsigned int TOTAL_NUM_ENTRIES = NUM_PACKETS * NUM_ENTRIES;
DummyFileHeaderContext fileHeaderContext;
- TransLogServer tlss("test9", 18377, ".", fileHeaderContext, 0x1000000);
+ TransLogServer tlss("test9", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000));
TransLogClient tls("tcp/localhost:18377");
createDomainTest(tls, "sync", 0);
@@ -769,7 +770,7 @@ Test::testTruncateOnVersionMismatch()
size_t countOld(0);
DummyFileHeaderContext fileHeaderContext;
{
- TransLogServer tlss("test11", 18377, ".", fileHeaderContext, 0x1000000);
+ TransLogServer tlss("test11", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000));
TransLogClient tls("tcp/localhost:18377");
createDomainTest(tls, "sync", 0);
@@ -790,7 +791,7 @@ Test::testTruncateOnVersionMismatch()
EXPECT_EQUAL(static_cast<ssize_t>(sizeof(tmp)), f.Write2(tmp, sizeof(tmp)));
EXPECT_TRUE(f.Close());
{
- TransLogServer tlss("test11", 18377, ".", fileHeaderContext, 0x1000000);
+ TransLogServer tlss("test11", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000));
TransLogClient tls("tcp/localhost:18377");
TransLogClient::Session::UP s1 = openDomainTest(tls, "sync");
uint64_t from(0), to(0);
@@ -816,7 +817,7 @@ Test::testTruncateOnShortRead()
DummyFileHeaderContext fileHeaderContext;
{
- TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, 0x10000);
+ TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000));
TransLogClient tls(tlsspec);
createDomainTest(tls, domain, 0);
@@ -832,7 +833,7 @@ Test::testTruncateOnShortRead()
EXPECT_EQUAL(2u, countFiles(dir));
}
{
- TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, 0x10000);
+ TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000));
TransLogClient tls(tlsspec);
TransLogClient::Session::UP s1 = openDomainTest(tls, domain);
checkFilledDomainTest(s1, TOTAL_NUM_ENTRIES);
@@ -848,7 +849,7 @@ Test::testTruncateOnShortRead()
trfile.Close();
}
{
- TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, 0x10000);
+ TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000));
TransLogClient tls(tlsspec);
TransLogClient::Session::UP s1 = openDomainTest(tls, domain);
checkFilledDomainTest(s1, TOTAL_NUM_ENTRIES - 1);
diff --git a/searchlib/src/tests/transactionlogstress/translogstress.cpp b/searchlib/src/tests/transactionlogstress/translogstress.cpp
index 62ef49ff689..6f2581d3799 100644
--- a/searchlib/src/tests/transactionlogstress/translogstress.cpp
+++ b/searchlib/src/tests/transactionlogstress/translogstress.cpp
@@ -698,7 +698,7 @@ TransLogStress::Main()
// start transaction log server
DummyFileHeaderContext fileHeaderContext;
- TransLogServer tls("server", 17897, ".", fileHeaderContext, _cfg.domainPartSize);
+ TransLogServer tls("server", 17897, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(_cfg.domainPartSize));
TransLogClient client(tlsSpec);
client.create(domain);
diff --git a/searchlib/src/vespa/searchlib/config/translogserver.def b/searchlib/src/vespa/searchlib/config/translogserver.def
index fa4b7ef3f3e..b1bdb3765b1 100644
--- a/searchlib/src/vespa/searchlib/config/translogserver.def
+++ b/searchlib/src/vespa/searchlib/config/translogserver.def
@@ -5,7 +5,7 @@ namespace=searchlib
listenport int default=13700 restart
## Max file size (50M)
-filesizemax int default=50000000 restart
+filesizemax int default=50000000
## Server name to identify server.
servername string default="tls" restart
@@ -30,3 +30,9 @@ compression.type enum {NONE, LZ4, ZSTD} default=LZ4
## LZ4 has normal range 1..9 while ZSTD has range 1..19
## 9 is a reasonable default for both
compression.level int default=9
+
+## How large a chunk can grow in memory before beeing flushed
+chunk.sizelimit int default = 256000 # 256k
+
+## How long a chunk can reside in memory befor ebeeing flushed to disk.
+chunk.agelimit double default = 0.0010 # 10 milliseconds
diff --git a/searchlib/src/vespa/searchlib/transactionlog/chunks.cpp b/searchlib/src/vespa/searchlib/transactionlog/chunks.cpp
index e4e98e60d3f..dea0ccf9b9a 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/chunks.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/chunks.cpp
@@ -95,9 +95,9 @@ XXH64Compressed::decompress(nbostream & is) {
is.adjustReadPos(is.size());
}
-XXH64Compressed::XXH64Compressed(CompressionConfig::Type type)
+XXH64Compressed::XXH64Compressed(CompressionConfig::Type type, uint8_t level)
: _type(type),
- _level(9)
+ _level(level)
{ }
Encoding
diff --git a/searchlib/src/vespa/searchlib/transactionlog/chunks.h b/searchlib/src/vespa/searchlib/transactionlog/chunks.h
index 85696b68245..cf88bc0a3ed 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/chunks.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/chunks.h
@@ -24,8 +24,7 @@ public:
class XXH64Compressed : public IChunk {
public:
using CompressionConfig = vespalib::compression::CompressionConfig;
- XXH64Compressed(CompressionConfig::Type);
- void setLevel(uint8_t level) { _level = level; }
+ XXH64Compressed(CompressionConfig::Type, uint8_t level);
protected:
void decompress(nbostream & os);
Encoding compress(nbostream & os, Encoding::Crc crc) const;
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
index c3b2738eac1..31ad57de1b6 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
@@ -21,35 +21,39 @@ using search::common::FileHeaderContext;
using std::runtime_error;
using namespace std::chrono_literals;
using namespace std::chrono;
+using std::make_shared;
namespace search::transactionlog {
+DomainConfig::DomainConfig()
+ : _encoding(Encoding::Crc::xxh64, Encoding::Compression::none),
+ _compressionLevel(9),
+ _partSizeLimit(0x10000000), // 256M
+ _chunkSizeLimit(0x40000), // 256k
+ _chunkAgeLimit(1ms)
+{ }
Domain::Domain(const string &domainName, const string & baseDir, FastOS_ThreadPool & threadPool,
- Executor & commitExecutor, Executor & sessionExecutor, uint64_t domainPartSize,
- Encoding defaultEncoding, const FileHeaderContext &fileHeaderContext) :
- _currentChunk(std::make_unique<Chunk>()),
- _lastSerial(0),
- _defaultEncoding(defaultEncoding),
- _threadPool(threadPool),
- _commitExecutor(commitExecutor),
- _sessionExecutor(sessionExecutor),
- _sessionId(1),
- _syncMonitor(),
- _pendingSync(false),
- _name(domainName),
- _domainPartSizeLimit(domainPartSize),
- _chunkSizeLimit(0x40000),
- _chunkAgeLimit(1ms),
- _parts(),
- _lock(),
- _currentChunkMonitor(),
- _sessionLock(),
- _sessions(),
- _maxSessionRunTime(),
- _baseDir(baseDir),
- _fileHeaderContext(fileHeaderContext),
- _markedDeleted(false),
- _self(nullptr)
+ Executor & commitExecutor, Executor & sessionExecutor, const DomainConfig & cfg,
+ const FileHeaderContext &fileHeaderContext)
+ : _config(cfg),
+ _currentChunk(std::make_unique<Chunk>()),
+ _lastSerial(0),
+ _threadPool(threadPool),
+ _commitExecutor(commitExecutor),
+ _sessionExecutor(sessionExecutor),
+ _sessionId(1),
+ _syncMonitor(),
+ _pendingSync(false),
+ _name(domainName),
+ _parts(),
+ _lock(),
+ _currentChunkMonitor(),
+ _sessionLock(),
+ _sessions(),
+ _baseDir(baseDir),
+ _fileHeaderContext(fileHeaderContext),
+ _markedDeleted(false),
+ _self(nullptr)
{
int retval(0);
if ((retval = makeDirectory(_baseDir.c_str())) != 0) {
@@ -67,24 +71,32 @@ Domain::Domain(const string &domainName, const string & baseDir, FastOS_ThreadPo
}
_sessionExecutor.sync();
if (_parts.empty() || _parts.crbegin()->second->isClosed()) {
- _parts[lastPart].reset(new DomainPart(_name, dir(), lastPart, _defaultEncoding, _fileHeaderContext, false));
+ _parts[lastPart] = make_shared<DomainPart>(_name, dir(), lastPart, _config.getEncoding(),
+ _config.getCompressionlevel(), _fileHeaderContext, false);
}
_lastSerial = end();
_self = _threadPool.NewThread(this);
assert(_self);
}
+Domain &
+Domain::setConfig(const DomainConfig & cfg) {
+ _config = cfg;
+ return *this;
+}
+
void
Domain::Run(FastOS_ThreadInterface *thisThread, void *) {
while (!thisThread->GetBreakFlag()) {
vespalib::MonitorGuard guard(_currentChunkMonitor);
- guard.wait(duration_cast<milliseconds>(_chunkAgeLimit).count());
+ guard.wait(duration_cast<milliseconds>(_config.getChunkAgeLimit()).count());
commitIfStale(guard);
}
}
void Domain::addPart(int64_t partId, bool isLastPart) {
- DomainPart::SP dp(new DomainPart(_name, dir(), partId, _defaultEncoding, _fileHeaderContext, isLastPart));
+ auto dp = make_shared<DomainPart>(_name, dir(), partId, _config.getEncoding(),
+ _config.getCompressionlevel(), _fileHeaderContext, isLastPart);
if (dp->size() == 0) {
// Only last domain part is allowed to be truncated down to
// empty size.
@@ -343,7 +355,7 @@ Domain::commit(const Packet & packet, Writer::DoneCallback onDone) {
_lastSerial = packet.range().to();
}
_currentChunk->add(packet, std::move(onDone));
- if (_currentChunk->sizeBytes() > _chunkSizeLimit) {
+ if (_currentChunk->sizeBytes() > _config.getChunkSizeLimit()) {
completed = grabCurrentChunk(guard);
}
if (completed) {
@@ -362,7 +374,7 @@ Domain::grabCurrentChunk(const vespalib::MonitorGuard & guard) {
void
Domain::commitIfStale(const vespalib::MonitorGuard & guard) {
assert(guard.monitors(_currentChunkMonitor));
- if (_currentChunk->age() > _chunkAgeLimit) {
+ if (_currentChunk->age() > _config.getChunkAgeLimit()) {
commitChunk(grabCurrentChunk(guard), guard);
}
}
@@ -376,12 +388,13 @@ Domain::commitChunk(std::unique_ptr<Chunk> chunk, const vespalib::MonitorGuard &
vespalib::nbostream_longlivedbuf is(packet.getHandle().c_str(), packet.getHandle().size());
Packet::Entry entry;
entry.deserialize(is);
- if (dp->byteSize() > _domainPartSizeLimit) {
+ if (dp->byteSize() > _config.getPartSizeLimit()) {
waitPendingSync(_syncMonitor, _pendingSync);
triggerSyncNow();
waitPendingSync(_syncMonitor, _pendingSync);
dp->close();
- dp.reset(new DomainPart(_name, dir(), entry.serial(), _defaultEncoding, _fileHeaderContext, false));
+ dp = make_shared<DomainPart>(_name, dir(), entry.serial(), _config.getEncoding(),
+ _config.getCompressionlevel(), _fileHeaderContext, false);
{
LockGuard guard(_lock);
_parts[entry.serial()] = dp;
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h
index c07459b0da9..9165e22f611 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domain.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h
@@ -8,16 +8,39 @@
namespace search::transactionlog {
+class DomainConfig {
+public:
+ using microseconds = std::chrono::microseconds;
+ DomainConfig();
+ DomainConfig & setEncoding(Encoding v) { _encoding = v; return *this; }
+ DomainConfig & setPartSizeLimit(size_t v) { _partSizeLimit = v; return *this; }
+ DomainConfig & setChunkSizeLimit(size_t v) { _chunkSizeLimit = v; return *this; }
+ DomainConfig & setChunkAgeLimit(microseconds v) { _chunkAgeLimit = v; return *this; }
+ DomainConfig & setCompressionLevel(uint8_t v) { _compressionLevel = v; return *this; }
+ Encoding getEncoding() const { return _encoding; }
+ size_t getPartSizeLimit() const { return _partSizeLimit; }
+ size_t getChunkSizeLimit() const { return _chunkSizeLimit; }
+ microseconds getChunkAgeLimit() const { return _chunkAgeLimit; }
+ uint8_t getCompressionlevel() const { return _compressionLevel; }
+private:
+ Encoding _encoding;
+ uint8_t _compressionLevel;
+ size_t _partSizeLimit;
+ size_t _chunkSizeLimit;
+ microseconds _chunkAgeLimit;
+};
+
struct PartInfo {
SerialNumRange range;
size_t numEntries;
size_t byteSize;
vespalib::string file;
- PartInfo(SerialNumRange range_in, size_t numEntries_in,
- size_t byteSize_in,
- vespalib::stringref file_in)
- : range(range_in), numEntries(numEntries_in), byteSize(byteSize_in),
- file(file_in) {}
+ PartInfo(SerialNumRange range_in, size_t numEntries_in, size_t byteSize_in, vespalib::stringref file_in)
+ : range(range_in),
+ numEntries(numEntries_in),
+ byteSize(byteSize_in),
+ file(file_in)
+ {}
};
struct DomainInfo {
@@ -41,8 +64,8 @@ public:
using SP = std::shared_ptr<Domain>;
using Executor = vespalib::ThreadExecutor;
Domain(const vespalib::string &name, const vespalib::string &baseDir, FastOS_ThreadPool & threadPool,
- Executor & commitExecutor, Executor & sessionExecutor, uint64_t domainPartSize,
- Encoding defaultEncoding, const common::FileHeaderContext &fileHeaderContext);
+ Executor & commitExecutor, Executor & sessionExecutor, const DomainConfig & cfg,
+ const common::FileHeaderContext &fileHeaderContext);
~Domain() override;
@@ -77,7 +100,7 @@ public:
return _sessionExecutor.execute(std::move(task));
}
uint64_t size() const;
-
+ Domain & setConfig(const DomainConfig & cfg);
private:
void Run(FastOS_ThreadInterface *thisThread, void *arguments) override;
void commitIfStale(const vespalib::MonitorGuard & guard);
@@ -113,28 +136,24 @@ private:
using DomainPartList = std::map<int64_t, DomainPart::SP>;
using DurationSeconds = std::chrono::duration<double>;
+ DomainConfig _config;
std::unique_ptr<Chunk> _currentChunk;
- SerialNum _lastSerial;
- Encoding _defaultEncoding;
- FastOS_ThreadPool & _threadPool;
- Executor & _commitExecutor;
- Executor & _sessionExecutor;
- std::atomic<int> _sessionId;
- vespalib::Monitor _syncMonitor;
- bool _pendingSync;
- vespalib::string _name;
- const uint64_t _domainPartSizeLimit;
- const uint64_t _chunkSizeLimit;
- const std::chrono::microseconds _chunkAgeLimit;
- DomainPartList _parts;
- vespalib::Lock _lock;
- vespalib::Monitor _currentChunkMonitor;
- vespalib::Lock _sessionLock;
- SessionList _sessions;
- DurationSeconds _maxSessionRunTime;
- vespalib::string _baseDir;
+ SerialNum _lastSerial;
+ FastOS_ThreadPool & _threadPool;
+ Executor & _commitExecutor;
+ Executor & _sessionExecutor;
+ std::atomic<int> _sessionId;
+ vespalib::Monitor _syncMonitor;
+ bool _pendingSync;
+ vespalib::string _name;
+ DomainPartList _parts;
+ vespalib::Lock _lock;
+ vespalib::Monitor _currentChunkMonitor;
+ vespalib::Lock _sessionLock;
+ SessionList _sessions;
+ vespalib::string _baseDir;
const common::FileHeaderContext &_fileHeaderContext;
- bool _markedDeleted;
+ bool _markedDeleted;
FastOS_ThreadInterface * _self;
};
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp
index 4724f896962..756f54ad2c0 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp
@@ -251,9 +251,10 @@ DomainPart::buildPacketMapping(bool allowTruncate)
return currPos;
}
-DomainPart::DomainPart(const string & name, const string & baseDir, SerialNum s, Encoding defaultEncoding,
- const FileHeaderContext &fileHeaderContext, bool allowTruncate) :
- _defaultEncoding(defaultEncoding),
+DomainPart::DomainPart(const string & name, const string & baseDir, SerialNum s, Encoding encoding,
+ uint8_t compressionLevel, const FileHeaderContext &fileHeaderContext, bool allowTruncate) :
+ _encoding(encoding),
+ _compressionLevel(compressionLevel),
_lock(),
_fileLock(),
_range(s),
@@ -397,7 +398,7 @@ DomainPart::commit(SerialNum firstSerial, const Packet &packet)
//LOG(spam,
//"Pos(%d) Len(%d), Lim(%d), Remaining(%d)",
//h.getPos(), h.getLength(), h.getLimit(), h.getRemaining());
- IChunk::UP chunk = IChunk::create(_defaultEncoding);
+ IChunk::UP chunk = IChunk::create(_encoding, _compressionLevel);
Packet::Entry entry;
entry.deserialize(h);
if (_range.to() < entry.serial()) {
@@ -535,7 +536,7 @@ DomainPart::write(FastOS_FileInterface &file, const IChunk & chunk)
{
nbostream os;
size_t begin = os.wp();
- os << _defaultEncoding.getRaw();
+ os << _encoding.getRaw();
os << uint32_t(0);
Encoding realEncoding = chunk.encode(os);
size_t end = os.wp();
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainpart.h b/searchlib/src/vespa/searchlib/transactionlog/domainpart.h
index 9f92c49eaf5..5256b731125 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domainpart.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/domainpart.h
@@ -22,7 +22,7 @@ private:
public:
typedef std::shared_ptr<DomainPart> SP;
DomainPart(const vespalib::string &name, const vespalib::string &baseDir, SerialNum s, Encoding defaultEncoding,
- const common::FileHeaderContext &FileHeaderContext, bool allowTruncate);
+ uint8_t compressionLevel, const common::FileHeaderContext &FileHeaderContext, bool allowTruncate);
~DomainPart();
@@ -74,7 +74,8 @@ private:
};
typedef std::vector<SkipInfo> SkipList;
typedef std::map<SerialNum, Packet> PacketList;
- const Encoding _defaultEncoding;
+ const Encoding _encoding;
+ const uint8_t _compressionLevel;
vespalib::Lock _lock;
vespalib::Lock _fileLock;
SerialNumRange _range;
diff --git a/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp b/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp
index e8a6afc106b..af7d396c0e4 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp
@@ -63,21 +63,21 @@ IChunk::decode(nbostream & is) {
IChunk::UP
IChunk::create(uint8_t chunkType) {
- return create(Encoding(chunkType));
+ return create(Encoding(chunkType), 9);
}
IChunk::UP
-IChunk::create(Encoding encoding) {
+IChunk::create(Encoding encoding, uint8_t compressionLevel) {
switch (encoding.getCrc()) {
case Encoding::Crc::xxh64:
switch (encoding.getCompression()) {
case Encoding::Compression::none:
return make_unique<XXH64None>();
case Encoding::Compression::lz4:
- return make_unique<XXH64Compressed>(CompressionConfig::LZ4);
+ return make_unique<XXH64Compressed>(CompressionConfig::LZ4, compressionLevel);
case Encoding::Compression::zstd:
- return make_unique<XXH64Compressed>(CompressionConfig::ZSTD);
+ return make_unique<XXH64Compressed>(CompressionConfig::ZSTD, compressionLevel);
default:
- return make_unique<XXH64Compressed>(CompressionConfig::LZ4);
+ return make_unique<XXH64Compressed>(CompressionConfig::LZ4, compressionLevel);
}
case Encoding::Crc::ccitt_crc32:
switch (encoding.getCompression()) {
diff --git a/searchlib/src/vespa/searchlib/transactionlog/ichunk.h b/searchlib/src/vespa/searchlib/transactionlog/ichunk.h
index 9186b3b12ff..0e913703468 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/ichunk.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/ichunk.h
@@ -43,7 +43,7 @@ public:
Encoding encode(nbostream & os) const;
void decode(nbostream & buf);
static UP create(uint8_t chunkType);
- static UP create(Encoding chunkType);
+ static UP create(Encoding chunkType, uint8_t compressionLevel);
SerialNumRange range() const;
protected:
virtual Encoding onEncode(nbostream & os) const = 0;
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
index db5b0cd0607..f724f5035c8 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
@@ -16,6 +16,7 @@ using vespalib::IllegalArgumentException;
using search::common::FileHeaderContext;
using std::make_shared;
using std::runtime_error;
+using namespace std::chrono_literals;
namespace search::transactionlog {
@@ -73,22 +74,22 @@ SyncHandler::PerformTask()
TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir,
const FileHeaderContext &fileHeaderContext)
- : TransLogServer(name, listenPort, baseDir, fileHeaderContext, 0x10000000)
+ : TransLogServer(name, listenPort, baseDir, fileHeaderContext,
+ DomainConfig().setEncoding(Encoding(Encoding::xxh64, Encoding::Compression::none))
+ .setPartSizeLimit(0x10000000).setChunkSizeLimit(0x40000).setChunkAgeLimit( 1ms))
{}
TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir,
- const FileHeaderContext &fileHeaderContext, uint64_t domainPartSize)
- : TransLogServer(name, listenPort, baseDir, fileHeaderContext, domainPartSize, 4, Encoding::xxh64)
+ const FileHeaderContext &fileHeaderContext, const DomainConfig & cfg)
+ : TransLogServer(name, listenPort, baseDir, fileHeaderContext, cfg, 4)
{}
TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir,
- const FileHeaderContext &fileHeaderContext, uint64_t domainPartSize,
- size_t maxThreads, Encoding defaultCrcType)
+ const FileHeaderContext &fileHeaderContext, const DomainConfig & cfg, size_t maxThreads)
: FRT_Invokable(),
_name(name),
_baseDir(baseDir),
- _domainPartSize(domainPartSize),
- _defaultEncoding(defaultCrcType),
+ _domainConfig(cfg),
_commitExecutor(maxThreads, 128*1024),
_sessionExecutor(maxThreads, 128*1024),
_threadPool(0x20000),
@@ -106,8 +107,8 @@ TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, con
domainDir >> domainName;
if ( ! domainName.empty()) {
try {
- auto domain = make_shared<Domain>(domainName, dir(), _threadPool, _commitExecutor, _sessionExecutor,
- _domainPartSize, _defaultEncoding,_fileHeaderContext);
+ auto domain = make_shared<Domain>(domainName, dir(), _threadPool, _commitExecutor,
+ _sessionExecutor, cfg,_fileHeaderContext);
_domains[domain->name()] = domain;
} catch (const std::exception & e) {
LOG(warning, "Failed creating %s domain on startup. Exception = %s", domainName.c_str(), e.what());
@@ -211,6 +212,17 @@ void TransLogServer::logMetric() const
}
}
+
+TransLogServer &
+TransLogServer::setDomainConfig(const DomainConfig & cfg) {
+ Guard domainGuard(_lock);
+ _domainConfig = cfg;
+ for(auto &domain: _domains) {
+ domain.second->setConfig(cfg);
+ }
+ return *this;
+}
+
DomainStats
TransLogServer::getDomainStats() const
{
@@ -345,8 +357,8 @@ void TransLogServer::createDomain(FRT_RPCRequest *req)
Domain::SP domain(findDomain(domainName));
if ( !domain ) {
try {
- domain = make_shared<Domain>(domainName, dir(), _threadPool, _commitExecutor, _sessionExecutor,
- _domainPartSize, _defaultEncoding, _fileHeaderContext);
+ domain = make_shared<Domain>(domainName, dir(), _threadPool, _commitExecutor,
+ _sessionExecutor, _domainConfig, _fileHeaderContext);
{
Guard domainGuard(_lock);
_domains[domain->name()] = domain;
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h
index 225412cdaf6..83627d2d3e3 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h
@@ -22,16 +22,15 @@ public:
typedef std::shared_ptr<TransLogServer> SP;
TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir,
- const common::FileHeaderContext &fileHeaderContext,
- uint64_t domainPartSize, size_t maxThreads, Encoding defaultCrc);
+ const common::FileHeaderContext &fileHeaderContext, const DomainConfig & cfg, size_t maxThreads);
TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir,
- const common::FileHeaderContext &fileHeaderContext, uint64_t domainPartSize);
+ const common::FileHeaderContext &fileHeaderContext, const DomainConfig & cfg);
TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir,
const common::FileHeaderContext &fileHeaderContext);
~TransLogServer() override;
DomainStats getDomainStats() const;
-
void commit(const vespalib::string & domainName, const Packet & packet, DoneCallback done) override;
+ TransLogServer & setDomainConfig(const DomainConfig & cfg);
class Session
{
@@ -79,8 +78,7 @@ private:
vespalib::string _name;
vespalib::string _baseDir;
- const uint64_t _domainPartSize;
- const Encoding _defaultEncoding;
+ DomainConfig _domainConfig;
vespalib::ThreadStackExecutor _commitExecutor;
vespalib::ThreadStackExecutor _sessionExecutor;
FastOS_ThreadPool _threadPool;
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp
index 60447e0c098..de3d72331d6 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp
@@ -7,12 +7,17 @@
LOG_SETUP(".translogserverapp");
using search::common::FileHeaderContext;
+using namespace std::chrono_literals;
namespace search::transactionlog {
+using LockGuard = std::lock_guard<std::mutex>;
+using std::make_unique;
+
TransLogServerApp::TransLogServerApp(const config::ConfigUri & tlsConfigUri,
const FileHeaderContext & fileHeaderContext)
- : _tls(),
+ : _lock(),
+ _tls(),
_tlsConfig(),
_tlsConfigFetcher(tlsConfigUri.getContext()),
_fileHeaderContext(fileHeaderContext)
@@ -56,13 +61,25 @@ getEncoding(const searchlib::TranslogserverConfig & cfg)
return Encoding(getCrc(cfg.crcmethod), getCompression(cfg.compression.type));
}
+DomainConfig
+getDomainConfig(const searchlib::TranslogserverConfig & cfg) {
+ DomainConfig dcfg;
+ dcfg.setEncoding(getEncoding(cfg))
+ .setCompressionLevel(cfg.compression.level)
+ .setPartSizeLimit(cfg.filesizemax)
+ .setChunkSizeLimit(cfg.chunk.sizelimit)
+ .setChunkAgeLimit(std::chrono::microseconds(int64_t(cfg.chunk.agelimit*1000000)));
+ return dcfg;
+}
+
}
void TransLogServerApp::start()
{
- std::shared_ptr<searchlib::TranslogserverConfig> c = _tlsConfig.get();
- _tls.reset(new TransLogServer(c->servername, c->listenport, c->basedir, _fileHeaderContext,
- c->filesizemax, c->maxthreads, getEncoding(*c)));
+ LockGuard guard(_lock);
+ auto c = _tlsConfig.get();
+ _tls = make_unique<TransLogServer>(c->servername, c->listenport, c->basedir, _fileHeaderContext,
+ getDomainConfig(*c), c->maxthreads);
}
TransLogServerApp::~TransLogServerApp()
@@ -73,8 +90,12 @@ TransLogServerApp::~TransLogServerApp()
void TransLogServerApp::configure(std::unique_ptr<searchlib::TranslogserverConfig> cfg)
{
LOG(config, "configure Transaction Log Server %s at port %d", cfg->servername.c_str(), cfg->listenport);
+ LockGuard guard(_lock);
_tlsConfig.set(cfg.release());
_tlsConfig.latch();
+ if (_tls) {
+ _tls->setDomainConfig(getDomainConfig(*cfg));
+ }
}
}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.h b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.h
index 35fa994d1e4..ea6d0158cec 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.h
@@ -14,6 +14,7 @@ namespace search::transactionlog {
class TransLogServerApp : public config::IFetcherCallback<searchlib::TranslogserverConfig>
{
private:
+ std::mutex _lock;
TransLogServer::SP _tls;
vespalib::PtrHolder<searchlib::TranslogserverConfig> _tlsConfig;
config::ConfigFetcher _tlsConfigFetcher;