diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-09-22 18:36:09 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-22 18:36:09 +0200 |
commit | 297842af8553e3360ff3cab4ec67f7f54af42095 (patch) | |
tree | db4c3f12652ea256aeca7855321336b91403f4fb /searchlib | |
parent | 09b0e5a365e5b68b2787dbe401f8397aa64fc67f (diff) |
Revert "- Group commits to the TLS and sync to disk before acking."
Diffstat (limited to 'searchlib')
5 files changed, 13 insertions, 83 deletions
diff --git a/searchlib/src/vespa/searchlib/config/translogserver.def b/searchlib/src/vespa/searchlib/config/translogserver.def index defce8c3421..38741745773 100644 --- a/searchlib/src/vespa/searchlib/config/translogserver.def +++ b/searchlib/src/vespa/searchlib/config/translogserver.def @@ -15,7 +15,7 @@ basedir string default="tmp" restart ## Use fsync after each commit. ## If not the below interval is used. -usefsync bool default=false +usefsync bool default=false restart ##Number of threads available for visiting/subscription. maxthreads int default=4 restart @@ -24,12 +24,12 @@ maxthreads int default=4 restart crcmethod enum {ccitt_crc32, xxh64} default=xxh64 ## Control compression type. -compression.type enum {NONE, NONE_MULTI, LZ4, ZSTD} default=NONE +compression.type enum {NONE, NONE_MULTI, LZ4, ZSTD} default=LZ4 ## Control compression level ## LZ4 has normal range 1..9 while ZSTD has range 1..19 ## 9 is a reasonable default for both -compression.level int default=3 +compression.level int default=9 ## How large a chunk can grow in memory before beeing flushed chunk.sizelimit int default = 256000 # 256k diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp index bd7feec0598..9e0f1a8a1aa 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp @@ -113,12 +113,7 @@ Domain::addPart(SerialNum partId, bool isLastPart) { } } -Domain::~Domain() { - MonitorGuard guard(_currentChunkMonitor); - guard.broadcast(); - commitChunk(grabCurrentChunk(guard), guard); - _singleCommitter->shutdown().sync(); -} +Domain::~Domain() { } DomainInfo Domain::getDomainInfo() const @@ -323,73 +318,22 @@ Domain::optionallyRotateFile(SerialNum serialNum) { return dp; } -void -Domain::append(const Packet & packet, Writer::DoneCallback onDone) { - vespalib::MonitorGuard guard(_currentChunkMonitor); - if (_lastSerial >= packet.range().from()) { - throw runtime_error(fmt("Incoming serial number(%" PRIu64 ") must be bigger than the last one (%" PRIu64 ").", - packet.range().from(), _lastSerial)); - } else { - _lastSerial = packet.range().to(); - } - _currentChunk->add(packet, std::move(onDone)); - commitIfFull(guard); -} - Domain::CommitResult Domain::startCommit(DoneCallback onDone) { - vespalib::MonitorGuard guard(_currentChunkMonitor); - if ( !_currentChunk->empty() ) { - auto completed = grabCurrentChunk(guard); - completed->setCommitDoneCallback(std::move(onDone)); - CommitResult result(completed->createCommitResult()); - commitChunk(std::move(completed), guard); - return result; - } + (void) onDone; return CommitResult(); } void -Domain::commitIfFull(const vespalib::MonitorGuard &guard) { - if (_currentChunk->sizeBytes() > _config.getChunkSizeLimit()) { - auto completed = std::move(_currentChunk); - _currentChunk = std::make_unique<CommitChunk>(_config.getChunkSizeLimit(), completed->stealCallbacks()); - commitChunk(std::move(completed), guard); - } -} - -std::unique_ptr<CommitChunk> -Domain::grabCurrentChunk(const vespalib::MonitorGuard & guard) { - assert(guard.monitors(_currentChunkMonitor)); - auto chunk = std::move(_currentChunk); - _currentChunk = createCommitChunk(_config); - return chunk; -} - -void -Domain::commitChunk(std::unique_ptr<CommitChunk> chunk, const vespalib::MonitorGuard & chunkOrderGuard) { - assert(chunkOrderGuard.monitors(_currentChunkMonitor)); - _singleCommitter->execute( makeLambdaTask([this, chunk = std::move(chunk)]() mutable { - doCommit(std::move(chunk)); - })); -} - -void -Domain::doCommit(std::unique_ptr<CommitChunk> chunk) { - const Packet & packet = chunk->getPacket(); - if (packet.empty()) return; - +Domain::append(const Packet & packet, Writer::DoneCallback onDone) +{ + (void) onDone; vespalib::nbostream_longlivedbuf is(packet.getHandle().data(), packet.getHandle().size()); Packet::Entry entry; entry.deserialize(is); DomainPart::SP dp = optionallyRotateFile(entry.serial()); dp->commit(entry.serial(), packet); - if (_config.getFSyncOnCommit()) { - dp->sync(); - } cleanSessions(); - LOG(debug, "Releasing %zu acks and %zu entries and %zu bytes.", - chunk->getNumCallBacks(), chunk->getPacket().size(), chunk->sizeBytes()); } bool diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h index 041ec27cf23..7e77e6ef0ef 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.h +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h @@ -56,11 +56,6 @@ public: uint64_t size() const; Domain & setConfig(const DomainConfig & cfg); private: - void commitIfFull(const vespalib::MonitorGuard & guard); - - std::unique_ptr<CommitChunk> grabCurrentChunk(const vespalib::MonitorGuard & guard); - void commitChunk(std::unique_ptr<CommitChunk> chunk, const vespalib::MonitorGuard & chunkOrderGuard); - void doCommit(std::unique_ptr<CommitChunk> chunk); SerialNum begin(const vespalib::LockGuard & guard) const; SerialNum end(const vespalib::LockGuard & guard) const; size_t byteSize(const vespalib::LockGuard & guard) const; diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp index b7e02894e6b..8855183226d 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp @@ -252,7 +252,7 @@ DomainPart::buildPacketMapping(bool allowTruncate) DomainPart::DomainPart(const string & name, const string & baseDir, SerialNum s, Encoding encoding, uint8_t compressionLevel, const FileHeaderContext &fileHeaderContext, bool allowTruncate) - : _encoding(encoding), + : _encoding(encoding.getCrc(), Encoding::Compression::none), //TODO We do not yet support compression _compressionLevel(compressionLevel), _lock(), _fileLock(), @@ -396,19 +396,16 @@ DomainPart::commit(SerialNum firstSerial, const Packet &packet) if (_range.from() == 0) { _range.from(firstSerial); } - IChunk::UP chunk = IChunk::create(_encoding, _compressionLevel); for (size_t i(0); h.size() > 0; i++) { //LOG(spam, //"Pos(%d) Len(%d), Lim(%d), Remaining(%d)", //h.getPos(), h.getLength(), h.getLimit(), h.getRemaining()); + IChunk::UP chunk = IChunk::create(_encoding, _compressionLevel); Packet::Entry entry; entry.deserialize(h); if (_range.to() < entry.serial()) { chunk->add(entry); - if (_encoding.getCompression() == Encoding::Compression::none) { - write(*_transLog, *chunk); - chunk = IChunk::create(_encoding, _compressionLevel); - } + write(*_transLog, *chunk); _sz++; _range.to(entry.serial()); } else { @@ -416,9 +413,6 @@ DomainPart::commit(SerialNum firstSerial, const Packet &packet) entry.serial(), _range.to())); } } - if ( ! chunk->getEntries().empty()) { - write(*_transLog, *chunk); - } bool merged(false); LockGuard guard(_lock); diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp index 0c0c9186e12..7be3dd708a5 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp @@ -572,11 +572,8 @@ TransLogServer::domainCommit(FRT_RPCRequest *req) Packet packet(params[1]._data._buf, params[1]._data._len); try { vespalib::Gate gate; - { - // Need to scope in order to drain out all the callbacks. - domain->append(packet, make_shared<GateCallback>(gate)); - auto keep = domain->startCommit(make_shared<IgnoreCallback>()); - } + domain->append(packet, make_shared<GateCallback>(gate)); + auto keep = domain->startCommit(make_shared<IgnoreCallback>()); gate.await(); ret.AddInt32(0); ret.AddString("ok"); |