diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-09-23 17:37:46 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-23 17:37:46 +0200 |
commit | 57b71a601f169d1444038227189f9dd8608ccb63 (patch) | |
tree | e313697b7a88500655ce3264c0ffcf43a2dce07c /searchlib | |
parent | 32e77ca9bea79a56161a5bf3dfb4a4eee027b68f (diff) |
Revert "Revert "Balder/group operations to tls and commit in batches""
Diffstat (limited to 'searchlib')
5 files changed, 81 insertions, 11 deletions
diff --git a/searchlib/src/vespa/searchlib/config/translogserver.def b/searchlib/src/vespa/searchlib/config/translogserver.def index 540895b2404..defce8c3421 100644 --- a/searchlib/src/vespa/searchlib/config/translogserver.def +++ b/searchlib/src/vespa/searchlib/config/translogserver.def @@ -15,7 +15,7 @@ basedir string default="tmp" restart ## Use fsync after each commit. ## If not the below interval is used. -usefsync bool default=false restart +usefsync bool default=false ##Number of threads available for visiting/subscription. maxthreads int default=4 restart diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp index 9e0f1a8a1aa..bd7feec0598 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp @@ -113,7 +113,12 @@ Domain::addPart(SerialNum partId, bool isLastPart) { } } -Domain::~Domain() { } +Domain::~Domain() { + MonitorGuard guard(_currentChunkMonitor); + guard.broadcast(); + commitChunk(grabCurrentChunk(guard), guard); + _singleCommitter->shutdown().sync(); +} DomainInfo Domain::getDomainInfo() const @@ -318,22 +323,73 @@ Domain::optionallyRotateFile(SerialNum serialNum) { return dp; } +void +Domain::append(const Packet & packet, Writer::DoneCallback onDone) { + vespalib::MonitorGuard guard(_currentChunkMonitor); + if (_lastSerial >= packet.range().from()) { + throw runtime_error(fmt("Incoming serial number(%" PRIu64 ") must be bigger than the last one (%" PRIu64 ").", + packet.range().from(), _lastSerial)); + } else { + _lastSerial = packet.range().to(); + } + _currentChunk->add(packet, std::move(onDone)); + commitIfFull(guard); +} + Domain::CommitResult Domain::startCommit(DoneCallback onDone) { - (void) onDone; + vespalib::MonitorGuard guard(_currentChunkMonitor); + if ( !_currentChunk->empty() ) { + auto completed = grabCurrentChunk(guard); + completed->setCommitDoneCallback(std::move(onDone)); + CommitResult result(completed->createCommitResult()); + commitChunk(std::move(completed), guard); + return result; + } return CommitResult(); } void -Domain::append(const Packet & packet, Writer::DoneCallback onDone) -{ - (void) onDone; +Domain::commitIfFull(const vespalib::MonitorGuard &guard) { + if (_currentChunk->sizeBytes() > _config.getChunkSizeLimit()) { + auto completed = std::move(_currentChunk); + _currentChunk = std::make_unique<CommitChunk>(_config.getChunkSizeLimit(), completed->stealCallbacks()); + commitChunk(std::move(completed), guard); + } +} + +std::unique_ptr<CommitChunk> +Domain::grabCurrentChunk(const vespalib::MonitorGuard & guard) { + assert(guard.monitors(_currentChunkMonitor)); + auto chunk = std::move(_currentChunk); + _currentChunk = createCommitChunk(_config); + return chunk; +} + +void +Domain::commitChunk(std::unique_ptr<CommitChunk> chunk, const vespalib::MonitorGuard & chunkOrderGuard) { + assert(chunkOrderGuard.monitors(_currentChunkMonitor)); + _singleCommitter->execute( makeLambdaTask([this, chunk = std::move(chunk)]() mutable { + doCommit(std::move(chunk)); + })); +} + +void +Domain::doCommit(std::unique_ptr<CommitChunk> chunk) { + const Packet & packet = chunk->getPacket(); + if (packet.empty()) return; + vespalib::nbostream_longlivedbuf is(packet.getHandle().data(), packet.getHandle().size()); Packet::Entry entry; entry.deserialize(is); DomainPart::SP dp = optionallyRotateFile(entry.serial()); dp->commit(entry.serial(), packet); + if (_config.getFSyncOnCommit()) { + dp->sync(); + } cleanSessions(); + LOG(debug, "Releasing %zu acks and %zu entries and %zu bytes.", + chunk->getNumCallBacks(), chunk->getPacket().size(), chunk->sizeBytes()); } bool diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h index 7e77e6ef0ef..041ec27cf23 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.h +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h @@ -56,6 +56,11 @@ public: uint64_t size() const; Domain & setConfig(const DomainConfig & cfg); private: + void commitIfFull(const vespalib::MonitorGuard & guard); + + std::unique_ptr<CommitChunk> grabCurrentChunk(const vespalib::MonitorGuard & guard); + void commitChunk(std::unique_ptr<CommitChunk> chunk, const vespalib::MonitorGuard & chunkOrderGuard); + void doCommit(std::unique_ptr<CommitChunk> chunk); SerialNum begin(const vespalib::LockGuard & guard) const; SerialNum end(const vespalib::LockGuard & guard) const; size_t byteSize(const vespalib::LockGuard & guard) const; diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp index 8855183226d..b7e02894e6b 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp @@ -252,7 +252,7 @@ DomainPart::buildPacketMapping(bool allowTruncate) DomainPart::DomainPart(const string & name, const string & baseDir, SerialNum s, Encoding encoding, uint8_t compressionLevel, const FileHeaderContext &fileHeaderContext, bool allowTruncate) - : _encoding(encoding.getCrc(), Encoding::Compression::none), //TODO We do not yet support compression + : _encoding(encoding), _compressionLevel(compressionLevel), _lock(), _fileLock(), @@ -396,16 +396,19 @@ DomainPart::commit(SerialNum firstSerial, const Packet &packet) if (_range.from() == 0) { _range.from(firstSerial); } + IChunk::UP chunk = IChunk::create(_encoding, _compressionLevel); for (size_t i(0); h.size() > 0; i++) { //LOG(spam, //"Pos(%d) Len(%d), Lim(%d), Remaining(%d)", //h.getPos(), h.getLength(), h.getLimit(), h.getRemaining()); - IChunk::UP chunk = IChunk::create(_encoding, _compressionLevel); Packet::Entry entry; entry.deserialize(h); if (_range.to() < entry.serial()) { chunk->add(entry); - write(*_transLog, *chunk); + if (_encoding.getCompression() == Encoding::Compression::none) { + write(*_transLog, *chunk); + chunk = IChunk::create(_encoding, _compressionLevel); + } _sz++; _range.to(entry.serial()); } else { @@ -413,6 +416,9 @@ DomainPart::commit(SerialNum firstSerial, const Packet &packet) entry.serial(), _range.to())); } } + if ( ! chunk->getEntries().empty()) { + write(*_transLog, *chunk); + } bool merged(false); LockGuard guard(_lock); diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp index 7be3dd708a5..0c0c9186e12 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp @@ -572,8 +572,11 @@ TransLogServer::domainCommit(FRT_RPCRequest *req) Packet packet(params[1]._data._buf, params[1]._data._len); try { vespalib::Gate gate; - domain->append(packet, make_shared<GateCallback>(gate)); - auto keep = domain->startCommit(make_shared<IgnoreCallback>()); + { + // Need to scope in order to drain out all the callbacks. + domain->append(packet, make_shared<GateCallback>(gate)); + auto keep = domain->startCommit(make_shared<IgnoreCallback>()); + } gate.await(); ret.AddInt32(0); ret.AddString("ok"); |