diff options
18 files changed, 149 insertions, 32 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.cpp b/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.cpp index b04bac5ef26..1f979d1566c 100644 --- a/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.cpp +++ b/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.cpp @@ -7,6 +7,7 @@ namespace proton::documentmetastore { LidReuseDelayerConfig::LidReuseDelayerConfig(const DocumentDBConfig & configSnapshot) : _visibilityDelay(configSnapshot.getMaintenanceConfigSP()->getVisibilityDelay()), + _allowEarlyAck(configSnapshot.getMaintenanceConfigSP()->allowEarlyAck()), _hasIndexedOrAttributeFields(configSnapshot.getSchemaSP()->getNumIndexFields() > 0 || configSnapshot.getSchemaSP()->getNumAttributeFields() > 0) { @@ -18,6 +19,7 @@ LidReuseDelayerConfig::LidReuseDelayerConfig() LidReuseDelayerConfig::LidReuseDelayerConfig(vespalib::duration visibilityDelay, bool hasIndexedOrAttributeFields_in) : _visibilityDelay(visibilityDelay), + _allowEarlyAck(visibilityDelay > 1ms), _hasIndexedOrAttributeFields(hasIndexedOrAttributeFields_in) { } diff --git a/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.h b/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.h index 82dab433a22..c81a2ff399f 100644 --- a/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.h +++ b/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.h @@ -15,6 +15,7 @@ class LidReuseDelayerConfig { private: vespalib::duration _visibilityDelay; + bool _allowEarlyAck; bool _hasIndexedOrAttributeFields; public: LidReuseDelayerConfig(); @@ -22,6 +23,7 @@ public: explicit LidReuseDelayerConfig(const DocumentDBConfig &configSnapshot); vespalib::duration visibilityDelay() const { return _visibilityDelay; } bool hasIndexedOrAttributeFields() const { return _hasIndexedOrAttributeFields; } + bool allowEarlyAck() const { return _allowEarlyAck; } }; } diff --git a/searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.cpp b/searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.cpp index 03dfd83a132..ed2202c830b 100644 --- a/searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.cpp +++ b/searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.cpp @@ -16,6 +16,7 @@ LidReuseDelayer::LidReuseDelayer(IThreadingService &writeService, IStore &docume : _writeService(writeService), _documentMetaStore(documentMetaStore), _immediateCommit(config.visibilityDelay() == vespalib::duration::zero()), + _allowEarlyAck(config.allowEarlyAck()), _config(config), _pendingLids() { diff --git a/searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.h b/searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.h index 5f1de878b4a..ba407ab57f8 100644 --- a/searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.h +++ b/searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.h @@ -27,6 +27,7 @@ class LidReuseDelayer searchcorespi::index::IThreadingService &_writeService; IStore &_documentMetaStore; const bool _immediateCommit; + const bool _allowEarlyAck; LidReuseDelayerConfig _config; std::vector<uint32_t> _pendingLids; // lids waiting for commit @@ -38,7 +39,8 @@ public: bool delayReuse(const std::vector<uint32_t> &lids); std::vector<uint32_t> getReuseLids(); - bool getImmediateCommit() const { return _immediateCommit; } + bool needImmediateCommit() const { return _immediateCommit; } + bool allowEarlyAck() const { return _allowEarlyAck; } const LidReuseDelayerConfig & getConfig() const { return _config; } }; diff --git a/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.h b/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.h index 4c0485baec6..5ed0ad7492c 100644 --- a/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.h +++ b/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.h @@ -135,6 +135,7 @@ public: } vespalib::duration getVisibilityDelay() const { return _visibilityDelay; } bool hasVisibilityDelay() const { return _visibilityDelay > vespalib::duration::zero(); } + bool allowEarlyAck() const { return _visibilityDelay > 1ms; } const DocumentDBLidSpaceCompactionConfig &getLidSpaceCompactionConfig() const { return _lidSpaceCompaction; } diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp index 37dfddf0c2c..209a35ce4a2 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp @@ -402,6 +402,8 @@ FeedHandler::FeedHandler(IThreadingService &writeService, _tlsReplayProgress(), _serialNum(0), _prunedSerialNum(0), + _numPendingCommit(0), + _numCommitsCompleted(0), _delayedPrune(false), _feedLock(), _feedState(make_shared<InitState>(getDocTypeName())), @@ -495,11 +497,39 @@ FeedHandler::getTransactionLogReplayDone() const { } void +FeedHandler::onCommitDone(uint64_t numPendingAtStart) { + assert(numPendingAtStart <= _numPendingCommit); + _numPendingCommit -= numPendingAtStart; + if (_numPendingCommit > 0) { + enqueCommitTask(); + } +} + +void FeedHandler::enqueCommitTask() { + _writeService.master().execute(makeLambdaTask([this]() { initiateCommit(); })); +} + +void +FeedHandler::initiateCommit() { + auto commitResult = _tlsWriter->startCommit(std::make_shared<OnCommitDone>( + _writeService.master(), + makeLambdaTask([this, numPendingAtStart=_numPendingCommit]() { + onCommitDone(numPendingAtStart); + }))); + if (_activeFeedView) { + _activeFeedView->forceCommit(_serialNum, std::make_shared<KeepAlive<CommitResult>>(std::move(commitResult))); + } +} + +void FeedHandler::appendOperation(const FeedOperation &op, TlsWriter::DoneCallback onDone) { if (!op.getSerialNum()) { const_cast<FeedOperation &>(op).setSerialNum(incSerialNum()); } _tlsWriter->appendOperation(op, std::move(onDone)); + if (++_numPendingCommit == 1) { + enqueCommitTask(); + } } FeedHandler::CommitResult @@ -510,7 +540,7 @@ FeedHandler::startCommit(DoneCallback onDone) { void FeedHandler::storeOperationSync(const FeedOperation &op) { vespalib::Gate gate; - appendOperation(op, make_shared<search::GateCallback>(gate)); + appendAndCommitOperation(op, make_shared<search::GateCallback>(gate)); gate.await(); } diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h index 4807c596130..97629bfc018 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h @@ -76,6 +76,8 @@ private: // the serial num of the last message in the transaction log SerialNum _serialNum; SerialNum _prunedSerialNum; + size_t _numPendingCommit; + size_t _numCommitsCompleted; bool _delayedPrune; mutable std::shared_mutex _feedLock; FeedStateSP _feedState; @@ -125,6 +127,9 @@ private: FeedStateSP getFeedState() const; void changeFeedState(FeedStateSP newState); void doChangeFeedState(FeedStateSP newState); + void onCommitDone(uint64_t numPendingAtStart); + void initiateCommit(); + void enqueCommitTask(); public: FeedHandler(const FeedHandler &) = delete; FeedHandler & operator = (const FeedHandler &) = delete; diff --git a/searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h b/searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h index b276779c2ee..c3b76a9db75 100644 --- a/searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h +++ b/searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h @@ -22,6 +22,10 @@ struct IOperationStorer */ virtual void appendOperation(const FeedOperation &op, DoneCallback onDone) = 0; [[nodiscard]] virtual CommitResult startCommit(DoneCallback onDone) = 0; + void appendAndCommitOperation(const FeedOperation &op, DoneCallback onDone) { + appendOperation(op, onDone); + (void) startCommit(std::move(onDone)); + } }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp index d423e095ad9..468850b4409 100644 --- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp @@ -82,7 +82,7 @@ LidSpaceCompactionJob::compactLidSpace(const LidUsageStats &stats) uint32_t wantedLidLimit = stats.getHighestUsedLid() + 1; CompactLidSpaceOperation op(_handler.getSubDbId(), wantedLidLimit); vespalib::Gate gate; - _opStorer.appendOperation(op, std::make_shared<search::GateCallback>(gate)); + _opStorer.appendAndCommitOperation(op, std::make_shared<search::GateCallback>(gate)); gate.await(); _handler.handleCompactLidSpace(op); EventLogger::lidSpaceCompactionComplete(_handler.getName(), wantedLidLimit); diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h index 37497eaa998..35549f21471 100644 --- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h +++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h @@ -63,13 +63,13 @@ public: ~LidSpaceCompactionJob(); // Implements IDiskMemUsageListener - virtual void notifyDiskMemUsage(DiskMemUsageState state) override; + void notifyDiskMemUsage(DiskMemUsageState state) override; // Implements IClusterStateChangedNofifier - virtual void notifyClusterStateChanged(const IBucketStateCalculator::SP &newCalc) override; + void notifyClusterStateChanged(const IBucketStateCalculator::SP &newCalc) override; // Implements IMaintenanceJob - virtual bool run() override; + bool run() override; }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp index d94bb2e3d03..d4b542a0af8 100644 --- a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp @@ -105,7 +105,7 @@ MaintenanceJobsInjector::injectJobs(MaintenanceController &controller, AttributeUsageFilter &attributeUsageFilter) { controller.registerJobInMasterThread(std::make_unique<HeartBeatJob>(hbHandler, config.getHeartBeatConfig())); controller.registerJobInDefaultPool(std::make_unique<PruneSessionCacheJob>(scPruner, config.getSessionCachePruneInterval())); - if (config.hasVisibilityDelay()) { + if (config.hasVisibilityDelay() && config.allowEarlyAck()) { controller.registerJobInMasterThread(std::make_unique<DocumentDBCommitJob>(commit, config.getVisibilityDelay())); } const MaintenanceDocumentSubDB &mRemSubDB(controller.getRemSubDB()); diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp index 217a3bb24d3..9927a2d2ce0 100644 --- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp @@ -210,11 +210,11 @@ moveMetaData(documentmetastore::IStore &meta_store, const DocumentId & doc_id, c } std::unique_ptr<PendingLidTrackerBase> -createUncommitedLidTracker(bool needImmediateCommit) { - if (needImmediateCommit) { - return std::make_unique<PendingLidTracker>(); - } else { +createUncommitedLidTracker(bool allowEarlyAck) { + if (allowEarlyAck) { return std::make_unique<TwoPhasePendingLidTracker>(); + } else { + return std::make_unique<PendingLidTracker>(); } } @@ -229,7 +229,7 @@ StoreOnlyFeedView::StoreOnlyFeedView(const Context &ctx, const PersistentParams _docType(nullptr), _lidReuseDelayer(ctx._writeService, _documentMetaStoreContext->get(), ctx._lidReuseDelayerConfig), _pendingLidsForDocStore(), - _pendingLidsForCommit(createUncommitedLidTracker(_lidReuseDelayer.getImmediateCommit())), + _pendingLidsForCommit(createUncommitedLidTracker(_lidReuseDelayer.allowEarlyAck())), _schema(ctx._schema), _writeService(ctx._writeService), _params(params), @@ -275,7 +275,7 @@ StoreOnlyFeedView::internalForceCommit(SerialNum serialNum, OnForceCommitDoneTyp void StoreOnlyFeedView::considerEarlyAck(FeedToken & token) { - if ( ! needCommit() && token) { + if ( _lidReuseDelayer.allowEarlyAck() && token) { token.reset(); } } @@ -327,7 +327,7 @@ StoreOnlyFeedView::internalPut(FeedToken token, const PutOperation &putOp) bool docAlreadyExists = putOp.getValidPrevDbdId(_params._subDbId); if (putOp.getValidDbdId(_params._subDbId)) { - bool immediateCommit = needCommit(); + bool immediateCommit = needImmediateCommit(); const document::GlobalId &gid = docId.getGlobalId(); std::shared_ptr<PutDoneContext> onWriteDone = createPutDoneContext(std::move(token), std::move(uncommitted), @@ -345,8 +345,8 @@ StoreOnlyFeedView::internalPut(FeedToken token, const PutOperation &putOp) } bool -StoreOnlyFeedView::needCommit() const { - return _lidReuseDelayer.getImmediateCommit(); +StoreOnlyFeedView::needImmediateCommit() const { + return _lidReuseDelayer.needImmediateCommit(); } void @@ -483,7 +483,7 @@ StoreOnlyFeedView::internalUpdate(FeedToken token, const UpdateOperation &updOp) auto uncommitted = _pendingLidsForCommit->produce(updOp.getLid()); considerEarlyAck(token); - bool immediateCommit = needCommit(); + bool immediateCommit = needImmediateCommit(); auto onWriteDone = createUpdateDoneContext(std::move(token), std::move(uncommitted), updOp.getUpdate()); UpdateScope updateScope(*_schema, upd); updateAttributes(serialNum, lid, upd, immediateCommit, onWriteDone, updateScope); @@ -657,7 +657,7 @@ StoreOnlyFeedView::internalRemove(FeedToken token, IPendingLidTracker::Token unc std::move(pendingNotifyRemoveDone), (explicitReuseLid ? lid : 0u), std::move(moveDoneCtx)); removeSummary(serialNum, lid, onWriteDone); - bool immediateCommit = needCommit(); + bool immediateCommit = needImmediateCommit(); removeAttributes(serialNum, lid, immediateCommit, onWriteDone); removeIndexedFields(serialNum, lid, immediateCommit, onWriteDone); } @@ -770,7 +770,7 @@ StoreOnlyFeedView::handleDeleteBucket(const DeleteBucketOperation &delOp) void StoreOnlyFeedView::internalDeleteBucket(const DeleteBucketOperation &delOp) { - bool immediateCommit = needCommit(); + bool immediateCommit = needImmediateCommit(); size_t rm_count = removeDocuments(delOp, true, immediateCommit); LOG(debug, "internalDeleteBucket(): docType(%s), bucket(%s), lidsToRemove(%zu)", _params._docTypeName.toString().c_str(), delOp.getBucketId().toString().c_str(), rm_count); @@ -809,7 +809,7 @@ StoreOnlyFeedView::handleMove(const MoveOperation &moveOp, IDestructorCallback:: PendingNotifyRemoveDone pendingNotifyRemoveDone = adjustMetaStore(moveOp, docId.getGlobalId(), docId); bool docAlreadyExists = moveOp.getValidPrevDbdId(_params._subDbId); if (moveOp.getValidDbdId(_params._subDbId)) { - bool immediateCommit = needCommit(); + bool immediateCommit = needImmediateCommit(); const document::GlobalId &gid = docId.getGlobalId(); std::shared_ptr<PutDoneContext> onWriteDone = createPutDoneContext(FeedToken(), _pendingLidsForCommit->produce(moveOp.getLid()), diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h index 7d91ea86a22..4569e01f9fd 100644 --- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h +++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h @@ -161,7 +161,7 @@ private: void putSummary(SerialNum serialNum, Lid lid, DocumentSP doc, OnOperationDoneType onDone); void removeSummary(SerialNum serialNum, Lid lid, OnWriteDoneType onDone); void heartBeatSummary(SerialNum serialNum); - bool needCommit() const; + bool needImmediateCommit() const; bool useDocumentStore(SerialNum replaySerialNum) const { diff --git a/searchlib/src/vespa/searchlib/config/translogserver.def b/searchlib/src/vespa/searchlib/config/translogserver.def index 540895b2404..defce8c3421 100644 --- a/searchlib/src/vespa/searchlib/config/translogserver.def +++ b/searchlib/src/vespa/searchlib/config/translogserver.def @@ -15,7 +15,7 @@ basedir string default="tmp" restart ## Use fsync after each commit. ## If not the below interval is used. -usefsync bool default=false restart +usefsync bool default=false ##Number of threads available for visiting/subscription. maxthreads int default=4 restart diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp index 9e0f1a8a1aa..bd7feec0598 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp @@ -113,7 +113,12 @@ Domain::addPart(SerialNum partId, bool isLastPart) { } } -Domain::~Domain() { } +Domain::~Domain() { + MonitorGuard guard(_currentChunkMonitor); + guard.broadcast(); + commitChunk(grabCurrentChunk(guard), guard); + _singleCommitter->shutdown().sync(); +} DomainInfo Domain::getDomainInfo() const @@ -318,22 +323,73 @@ Domain::optionallyRotateFile(SerialNum serialNum) { return dp; } +void +Domain::append(const Packet & packet, Writer::DoneCallback onDone) { + vespalib::MonitorGuard guard(_currentChunkMonitor); + if (_lastSerial >= packet.range().from()) { + throw runtime_error(fmt("Incoming serial number(%" PRIu64 ") must be bigger than the last one (%" PRIu64 ").", + packet.range().from(), _lastSerial)); + } else { + _lastSerial = packet.range().to(); + } + _currentChunk->add(packet, std::move(onDone)); + commitIfFull(guard); +} + Domain::CommitResult Domain::startCommit(DoneCallback onDone) { - (void) onDone; + vespalib::MonitorGuard guard(_currentChunkMonitor); + if ( !_currentChunk->empty() ) { + auto completed = grabCurrentChunk(guard); + completed->setCommitDoneCallback(std::move(onDone)); + CommitResult result(completed->createCommitResult()); + commitChunk(std::move(completed), guard); + return result; + } return CommitResult(); } void -Domain::append(const Packet & packet, Writer::DoneCallback onDone) -{ - (void) onDone; +Domain::commitIfFull(const vespalib::MonitorGuard &guard) { + if (_currentChunk->sizeBytes() > _config.getChunkSizeLimit()) { + auto completed = std::move(_currentChunk); + _currentChunk = std::make_unique<CommitChunk>(_config.getChunkSizeLimit(), completed->stealCallbacks()); + commitChunk(std::move(completed), guard); + } +} + +std::unique_ptr<CommitChunk> +Domain::grabCurrentChunk(const vespalib::MonitorGuard & guard) { + assert(guard.monitors(_currentChunkMonitor)); + auto chunk = std::move(_currentChunk); + _currentChunk = createCommitChunk(_config); + return chunk; +} + +void +Domain::commitChunk(std::unique_ptr<CommitChunk> chunk, const vespalib::MonitorGuard & chunkOrderGuard) { + assert(chunkOrderGuard.monitors(_currentChunkMonitor)); + _singleCommitter->execute( makeLambdaTask([this, chunk = std::move(chunk)]() mutable { + doCommit(std::move(chunk)); + })); +} + +void +Domain::doCommit(std::unique_ptr<CommitChunk> chunk) { + const Packet & packet = chunk->getPacket(); + if (packet.empty()) return; + vespalib::nbostream_longlivedbuf is(packet.getHandle().data(), packet.getHandle().size()); Packet::Entry entry; entry.deserialize(is); DomainPart::SP dp = optionallyRotateFile(entry.serial()); dp->commit(entry.serial(), packet); + if (_config.getFSyncOnCommit()) { + dp->sync(); + } cleanSessions(); + LOG(debug, "Releasing %zu acks and %zu entries and %zu bytes.", + chunk->getNumCallBacks(), chunk->getPacket().size(), chunk->sizeBytes()); } bool diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h index 7e77e6ef0ef..041ec27cf23 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.h +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h @@ -56,6 +56,11 @@ public: uint64_t size() const; Domain & setConfig(const DomainConfig & cfg); private: + void commitIfFull(const vespalib::MonitorGuard & guard); + + std::unique_ptr<CommitChunk> grabCurrentChunk(const vespalib::MonitorGuard & guard); + void commitChunk(std::unique_ptr<CommitChunk> chunk, const vespalib::MonitorGuard & chunkOrderGuard); + void doCommit(std::unique_ptr<CommitChunk> chunk); SerialNum begin(const vespalib::LockGuard & guard) const; SerialNum end(const vespalib::LockGuard & guard) const; size_t byteSize(const vespalib::LockGuard & guard) const; diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp index 8855183226d..b7e02894e6b 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp @@ -252,7 +252,7 @@ DomainPart::buildPacketMapping(bool allowTruncate) DomainPart::DomainPart(const string & name, const string & baseDir, SerialNum s, Encoding encoding, uint8_t compressionLevel, const FileHeaderContext &fileHeaderContext, bool allowTruncate) - : _encoding(encoding.getCrc(), Encoding::Compression::none), //TODO We do not yet support compression + : _encoding(encoding), _compressionLevel(compressionLevel), _lock(), _fileLock(), @@ -396,16 +396,19 @@ DomainPart::commit(SerialNum firstSerial, const Packet &packet) if (_range.from() == 0) { _range.from(firstSerial); } + IChunk::UP chunk = IChunk::create(_encoding, _compressionLevel); for (size_t i(0); h.size() > 0; i++) { //LOG(spam, //"Pos(%d) Len(%d), Lim(%d), Remaining(%d)", //h.getPos(), h.getLength(), h.getLimit(), h.getRemaining()); - IChunk::UP chunk = IChunk::create(_encoding, _compressionLevel); Packet::Entry entry; entry.deserialize(h); if (_range.to() < entry.serial()) { chunk->add(entry); - write(*_transLog, *chunk); + if (_encoding.getCompression() == Encoding::Compression::none) { + write(*_transLog, *chunk); + chunk = IChunk::create(_encoding, _compressionLevel); + } _sz++; _range.to(entry.serial()); } else { @@ -413,6 +416,9 @@ DomainPart::commit(SerialNum firstSerial, const Packet &packet) entry.serial(), _range.to())); } } + if ( ! chunk->getEntries().empty()) { + write(*_transLog, *chunk); + } bool merged(false); LockGuard guard(_lock); diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp index 7be3dd708a5..0c0c9186e12 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp @@ -572,8 +572,11 @@ TransLogServer::domainCommit(FRT_RPCRequest *req) Packet packet(params[1]._data._buf, params[1]._data._len); try { vespalib::Gate gate; - domain->append(packet, make_shared<GateCallback>(gate)); - auto keep = domain->startCommit(make_shared<IgnoreCallback>()); + { + // Need to scope in order to drain out all the callbacks. + domain->append(packet, make_shared<GateCallback>(gate)); + auto keep = domain->startCommit(make_shared<IgnoreCallback>()); + } gate.await(); ret.AddInt32(0); ret.AddString("ok"); |