diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-09-26 21:30:47 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-26 21:30:47 +0200 |
commit | f6683a698da0a7478a00dcc57c9abf2dccc35f4a (patch) | |
tree | 60d8dae4397bdff0068e255b3e26efb8f4662ba8 | |
parent | 511057bb713a56f99a9b73e2ab3de998f72aee67 (diff) |
Revert "Revert "Revert "Revert "Revert "Balder/group operations to tls and commit in batches"""""
18 files changed, 32 insertions, 149 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.cpp b/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.cpp index 1f979d1566c..b04bac5ef26 100644 --- a/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.cpp +++ b/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.cpp @@ -7,7 +7,6 @@ namespace proton::documentmetastore { LidReuseDelayerConfig::LidReuseDelayerConfig(const DocumentDBConfig & configSnapshot) : _visibilityDelay(configSnapshot.getMaintenanceConfigSP()->getVisibilityDelay()), - _allowEarlyAck(configSnapshot.getMaintenanceConfigSP()->allowEarlyAck()), _hasIndexedOrAttributeFields(configSnapshot.getSchemaSP()->getNumIndexFields() > 0 || configSnapshot.getSchemaSP()->getNumAttributeFields() > 0) { @@ -19,7 +18,6 @@ LidReuseDelayerConfig::LidReuseDelayerConfig() LidReuseDelayerConfig::LidReuseDelayerConfig(vespalib::duration visibilityDelay, bool hasIndexedOrAttributeFields_in) : _visibilityDelay(visibilityDelay), - _allowEarlyAck(visibilityDelay > 1ms), _hasIndexedOrAttributeFields(hasIndexedOrAttributeFields_in) { } diff --git a/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.h b/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.h index c81a2ff399f..82dab433a22 100644 --- a/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.h +++ b/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.h @@ -15,7 +15,6 @@ class LidReuseDelayerConfig { private: vespalib::duration _visibilityDelay; - bool _allowEarlyAck; bool _hasIndexedOrAttributeFields; public: LidReuseDelayerConfig(); @@ -23,7 +22,6 @@ public: explicit LidReuseDelayerConfig(const DocumentDBConfig &configSnapshot); vespalib::duration visibilityDelay() const { return _visibilityDelay; } bool hasIndexedOrAttributeFields() const { return _hasIndexedOrAttributeFields; } - bool allowEarlyAck() const { return _allowEarlyAck; } }; } diff --git a/searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.cpp b/searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.cpp index ed2202c830b..03dfd83a132 100644 --- a/searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.cpp +++ b/searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.cpp @@ -16,7 +16,6 @@ LidReuseDelayer::LidReuseDelayer(IThreadingService &writeService, IStore &docume : _writeService(writeService), _documentMetaStore(documentMetaStore), _immediateCommit(config.visibilityDelay() == vespalib::duration::zero()), - _allowEarlyAck(config.allowEarlyAck()), _config(config), _pendingLids() { diff --git a/searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.h b/searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.h index ba407ab57f8..5f1de878b4a 100644 --- a/searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.h +++ b/searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.h @@ -27,7 +27,6 @@ class LidReuseDelayer searchcorespi::index::IThreadingService &_writeService; IStore &_documentMetaStore; const bool _immediateCommit; - const bool _allowEarlyAck; LidReuseDelayerConfig _config; std::vector<uint32_t> _pendingLids; // lids waiting for commit @@ -39,8 +38,7 @@ public: bool delayReuse(const std::vector<uint32_t> &lids); std::vector<uint32_t> getReuseLids(); - bool needImmediateCommit() const { return _immediateCommit; } - bool allowEarlyAck() const { return _allowEarlyAck; } + bool getImmediateCommit() const { return _immediateCommit; } const LidReuseDelayerConfig & getConfig() const { return _config; } }; diff --git a/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.h b/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.h index 5ed0ad7492c..4c0485baec6 100644 --- a/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.h +++ b/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.h @@ -135,7 +135,6 @@ public: } vespalib::duration getVisibilityDelay() const { return _visibilityDelay; } bool hasVisibilityDelay() const { return _visibilityDelay > vespalib::duration::zero(); } - bool allowEarlyAck() const { return _visibilityDelay > 1ms; } const DocumentDBLidSpaceCompactionConfig &getLidSpaceCompactionConfig() const { return _lidSpaceCompaction; } diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp index 209a35ce4a2..37dfddf0c2c 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp @@ -402,8 +402,6 @@ FeedHandler::FeedHandler(IThreadingService &writeService, _tlsReplayProgress(), _serialNum(0), _prunedSerialNum(0), - _numPendingCommit(0), - _numCommitsCompleted(0), _delayedPrune(false), _feedLock(), _feedState(make_shared<InitState>(getDocTypeName())), @@ -497,39 +495,11 @@ FeedHandler::getTransactionLogReplayDone() const { } void -FeedHandler::onCommitDone(uint64_t numPendingAtStart) { - assert(numPendingAtStart <= _numPendingCommit); - _numPendingCommit -= numPendingAtStart; - if (_numPendingCommit > 0) { - enqueCommitTask(); - } -} - -void FeedHandler::enqueCommitTask() { - _writeService.master().execute(makeLambdaTask([this]() { initiateCommit(); })); -} - -void -FeedHandler::initiateCommit() { - auto commitResult = _tlsWriter->startCommit(std::make_shared<OnCommitDone>( - _writeService.master(), - makeLambdaTask([this, numPendingAtStart=_numPendingCommit]() { - onCommitDone(numPendingAtStart); - }))); - if (_activeFeedView) { - _activeFeedView->forceCommit(_serialNum, std::make_shared<KeepAlive<CommitResult>>(std::move(commitResult))); - } -} - -void FeedHandler::appendOperation(const FeedOperation &op, TlsWriter::DoneCallback onDone) { if (!op.getSerialNum()) { const_cast<FeedOperation &>(op).setSerialNum(incSerialNum()); } _tlsWriter->appendOperation(op, std::move(onDone)); - if (++_numPendingCommit == 1) { - enqueCommitTask(); - } } FeedHandler::CommitResult @@ -540,7 +510,7 @@ FeedHandler::startCommit(DoneCallback onDone) { void FeedHandler::storeOperationSync(const FeedOperation &op) { vespalib::Gate gate; - appendAndCommitOperation(op, make_shared<search::GateCallback>(gate)); + appendOperation(op, make_shared<search::GateCallback>(gate)); gate.await(); } diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h index 97629bfc018..4807c596130 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h @@ -76,8 +76,6 @@ private: // the serial num of the last message in the transaction log SerialNum _serialNum; SerialNum _prunedSerialNum; - size_t _numPendingCommit; - size_t _numCommitsCompleted; bool _delayedPrune; mutable std::shared_mutex _feedLock; FeedStateSP _feedState; @@ -127,9 +125,6 @@ private: FeedStateSP getFeedState() const; void changeFeedState(FeedStateSP newState); void doChangeFeedState(FeedStateSP newState); - void onCommitDone(uint64_t numPendingAtStart); - void initiateCommit(); - void enqueCommitTask(); public: FeedHandler(const FeedHandler &) = delete; FeedHandler & operator = (const FeedHandler &) = delete; diff --git a/searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h b/searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h index c3b76a9db75..b276779c2ee 100644 --- a/searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h +++ b/searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h @@ -22,10 +22,6 @@ struct IOperationStorer */ virtual void appendOperation(const FeedOperation &op, DoneCallback onDone) = 0; [[nodiscard]] virtual CommitResult startCommit(DoneCallback onDone) = 0; - void appendAndCommitOperation(const FeedOperation &op, DoneCallback onDone) { - appendOperation(op, onDone); - (void) startCommit(std::move(onDone)); - } }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp index 468850b4409..d423e095ad9 100644 --- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp @@ -82,7 +82,7 @@ LidSpaceCompactionJob::compactLidSpace(const LidUsageStats &stats) uint32_t wantedLidLimit = stats.getHighestUsedLid() + 1; CompactLidSpaceOperation op(_handler.getSubDbId(), wantedLidLimit); vespalib::Gate gate; - _opStorer.appendAndCommitOperation(op, std::make_shared<search::GateCallback>(gate)); + _opStorer.appendOperation(op, std::make_shared<search::GateCallback>(gate)); gate.await(); _handler.handleCompactLidSpace(op); EventLogger::lidSpaceCompactionComplete(_handler.getName(), wantedLidLimit); diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h index 35549f21471..37497eaa998 100644 --- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h +++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h @@ -63,13 +63,13 @@ public: ~LidSpaceCompactionJob(); // Implements IDiskMemUsageListener - void notifyDiskMemUsage(DiskMemUsageState state) override; + virtual void notifyDiskMemUsage(DiskMemUsageState state) override; // Implements IClusterStateChangedNofifier - void notifyClusterStateChanged(const IBucketStateCalculator::SP &newCalc) override; + virtual void notifyClusterStateChanged(const IBucketStateCalculator::SP &newCalc) override; // Implements IMaintenanceJob - bool run() override; + virtual bool run() override; }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp index d4b542a0af8..d94bb2e3d03 100644 --- a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp @@ -105,7 +105,7 @@ MaintenanceJobsInjector::injectJobs(MaintenanceController &controller, AttributeUsageFilter &attributeUsageFilter) { controller.registerJobInMasterThread(std::make_unique<HeartBeatJob>(hbHandler, config.getHeartBeatConfig())); controller.registerJobInDefaultPool(std::make_unique<PruneSessionCacheJob>(scPruner, config.getSessionCachePruneInterval())); - if (config.hasVisibilityDelay() && config.allowEarlyAck()) { + if (config.hasVisibilityDelay()) { controller.registerJobInMasterThread(std::make_unique<DocumentDBCommitJob>(commit, config.getVisibilityDelay())); } const MaintenanceDocumentSubDB &mRemSubDB(controller.getRemSubDB()); diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp index 9927a2d2ce0..217a3bb24d3 100644 --- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp @@ -210,11 +210,11 @@ moveMetaData(documentmetastore::IStore &meta_store, const DocumentId & doc_id, c } std::unique_ptr<PendingLidTrackerBase> -createUncommitedLidTracker(bool allowEarlyAck) { - if (allowEarlyAck) { - return std::make_unique<TwoPhasePendingLidTracker>(); - } else { +createUncommitedLidTracker(bool needImmediateCommit) { + if (needImmediateCommit) { return std::make_unique<PendingLidTracker>(); + } else { + return std::make_unique<TwoPhasePendingLidTracker>(); } } @@ -229,7 +229,7 @@ StoreOnlyFeedView::StoreOnlyFeedView(const Context &ctx, const PersistentParams _docType(nullptr), _lidReuseDelayer(ctx._writeService, _documentMetaStoreContext->get(), ctx._lidReuseDelayerConfig), _pendingLidsForDocStore(), - _pendingLidsForCommit(createUncommitedLidTracker(_lidReuseDelayer.allowEarlyAck())), + _pendingLidsForCommit(createUncommitedLidTracker(_lidReuseDelayer.getImmediateCommit())), _schema(ctx._schema), _writeService(ctx._writeService), _params(params), @@ -275,7 +275,7 @@ StoreOnlyFeedView::internalForceCommit(SerialNum serialNum, OnForceCommitDoneTyp void StoreOnlyFeedView::considerEarlyAck(FeedToken & token) { - if ( _lidReuseDelayer.allowEarlyAck() && token) { + if ( ! needCommit() && token) { token.reset(); } } @@ -327,7 +327,7 @@ StoreOnlyFeedView::internalPut(FeedToken token, const PutOperation &putOp) bool docAlreadyExists = putOp.getValidPrevDbdId(_params._subDbId); if (putOp.getValidDbdId(_params._subDbId)) { - bool immediateCommit = needImmediateCommit(); + bool immediateCommit = needCommit(); const document::GlobalId &gid = docId.getGlobalId(); std::shared_ptr<PutDoneContext> onWriteDone = createPutDoneContext(std::move(token), std::move(uncommitted), @@ -345,8 +345,8 @@ StoreOnlyFeedView::internalPut(FeedToken token, const PutOperation &putOp) } bool -StoreOnlyFeedView::needImmediateCommit() const { - return _lidReuseDelayer.needImmediateCommit(); +StoreOnlyFeedView::needCommit() const { + return _lidReuseDelayer.getImmediateCommit(); } void @@ -483,7 +483,7 @@ StoreOnlyFeedView::internalUpdate(FeedToken token, const UpdateOperation &updOp) auto uncommitted = _pendingLidsForCommit->produce(updOp.getLid()); considerEarlyAck(token); - bool immediateCommit = needImmediateCommit(); + bool immediateCommit = needCommit(); auto onWriteDone = createUpdateDoneContext(std::move(token), std::move(uncommitted), updOp.getUpdate()); UpdateScope updateScope(*_schema, upd); updateAttributes(serialNum, lid, upd, immediateCommit, onWriteDone, updateScope); @@ -657,7 +657,7 @@ StoreOnlyFeedView::internalRemove(FeedToken token, IPendingLidTracker::Token unc std::move(pendingNotifyRemoveDone), (explicitReuseLid ? lid : 0u), std::move(moveDoneCtx)); removeSummary(serialNum, lid, onWriteDone); - bool immediateCommit = needImmediateCommit(); + bool immediateCommit = needCommit(); removeAttributes(serialNum, lid, immediateCommit, onWriteDone); removeIndexedFields(serialNum, lid, immediateCommit, onWriteDone); } @@ -770,7 +770,7 @@ StoreOnlyFeedView::handleDeleteBucket(const DeleteBucketOperation &delOp) void StoreOnlyFeedView::internalDeleteBucket(const DeleteBucketOperation &delOp) { - bool immediateCommit = needImmediateCommit(); + bool immediateCommit = needCommit(); size_t rm_count = removeDocuments(delOp, true, immediateCommit); LOG(debug, "internalDeleteBucket(): docType(%s), bucket(%s), lidsToRemove(%zu)", _params._docTypeName.toString().c_str(), delOp.getBucketId().toString().c_str(), rm_count); @@ -809,7 +809,7 @@ StoreOnlyFeedView::handleMove(const MoveOperation &moveOp, IDestructorCallback:: PendingNotifyRemoveDone pendingNotifyRemoveDone = adjustMetaStore(moveOp, docId.getGlobalId(), docId); bool docAlreadyExists = moveOp.getValidPrevDbdId(_params._subDbId); if (moveOp.getValidDbdId(_params._subDbId)) { - bool immediateCommit = needImmediateCommit(); + bool immediateCommit = needCommit(); const document::GlobalId &gid = docId.getGlobalId(); std::shared_ptr<PutDoneContext> onWriteDone = createPutDoneContext(FeedToken(), _pendingLidsForCommit->produce(moveOp.getLid()), diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h index 4569e01f9fd..7d91ea86a22 100644 --- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h +++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h @@ -161,7 +161,7 @@ private: void putSummary(SerialNum serialNum, Lid lid, DocumentSP doc, OnOperationDoneType onDone); void removeSummary(SerialNum serialNum, Lid lid, OnWriteDoneType onDone); void heartBeatSummary(SerialNum serialNum); - bool needImmediateCommit() const; + bool needCommit() const; bool useDocumentStore(SerialNum replaySerialNum) const { diff --git a/searchlib/src/vespa/searchlib/config/translogserver.def b/searchlib/src/vespa/searchlib/config/translogserver.def index defce8c3421..540895b2404 100644 --- a/searchlib/src/vespa/searchlib/config/translogserver.def +++ b/searchlib/src/vespa/searchlib/config/translogserver.def @@ -15,7 +15,7 @@ basedir string default="tmp" restart ## Use fsync after each commit. ## If not the below interval is used. -usefsync bool default=false +usefsync bool default=false restart ##Number of threads available for visiting/subscription. maxthreads int default=4 restart diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp index bd7feec0598..9e0f1a8a1aa 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp @@ -113,12 +113,7 @@ Domain::addPart(SerialNum partId, bool isLastPart) { } } -Domain::~Domain() { - MonitorGuard guard(_currentChunkMonitor); - guard.broadcast(); - commitChunk(grabCurrentChunk(guard), guard); - _singleCommitter->shutdown().sync(); -} +Domain::~Domain() { } DomainInfo Domain::getDomainInfo() const @@ -323,73 +318,22 @@ Domain::optionallyRotateFile(SerialNum serialNum) { return dp; } -void -Domain::append(const Packet & packet, Writer::DoneCallback onDone) { - vespalib::MonitorGuard guard(_currentChunkMonitor); - if (_lastSerial >= packet.range().from()) { - throw runtime_error(fmt("Incoming serial number(%" PRIu64 ") must be bigger than the last one (%" PRIu64 ").", - packet.range().from(), _lastSerial)); - } else { - _lastSerial = packet.range().to(); - } - _currentChunk->add(packet, std::move(onDone)); - commitIfFull(guard); -} - Domain::CommitResult Domain::startCommit(DoneCallback onDone) { - vespalib::MonitorGuard guard(_currentChunkMonitor); - if ( !_currentChunk->empty() ) { - auto completed = grabCurrentChunk(guard); - completed->setCommitDoneCallback(std::move(onDone)); - CommitResult result(completed->createCommitResult()); - commitChunk(std::move(completed), guard); - return result; - } + (void) onDone; return CommitResult(); } void -Domain::commitIfFull(const vespalib::MonitorGuard &guard) { - if (_currentChunk->sizeBytes() > _config.getChunkSizeLimit()) { - auto completed = std::move(_currentChunk); - _currentChunk = std::make_unique<CommitChunk>(_config.getChunkSizeLimit(), completed->stealCallbacks()); - commitChunk(std::move(completed), guard); - } -} - -std::unique_ptr<CommitChunk> -Domain::grabCurrentChunk(const vespalib::MonitorGuard & guard) { - assert(guard.monitors(_currentChunkMonitor)); - auto chunk = std::move(_currentChunk); - _currentChunk = createCommitChunk(_config); - return chunk; -} - -void -Domain::commitChunk(std::unique_ptr<CommitChunk> chunk, const vespalib::MonitorGuard & chunkOrderGuard) { - assert(chunkOrderGuard.monitors(_currentChunkMonitor)); - _singleCommitter->execute( makeLambdaTask([this, chunk = std::move(chunk)]() mutable { - doCommit(std::move(chunk)); - })); -} - -void -Domain::doCommit(std::unique_ptr<CommitChunk> chunk) { - const Packet & packet = chunk->getPacket(); - if (packet.empty()) return; - +Domain::append(const Packet & packet, Writer::DoneCallback onDone) +{ + (void) onDone; vespalib::nbostream_longlivedbuf is(packet.getHandle().data(), packet.getHandle().size()); Packet::Entry entry; entry.deserialize(is); DomainPart::SP dp = optionallyRotateFile(entry.serial()); dp->commit(entry.serial(), packet); - if (_config.getFSyncOnCommit()) { - dp->sync(); - } cleanSessions(); - LOG(debug, "Releasing %zu acks and %zu entries and %zu bytes.", - chunk->getNumCallBacks(), chunk->getPacket().size(), chunk->sizeBytes()); } bool diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h index 041ec27cf23..7e77e6ef0ef 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.h +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h @@ -56,11 +56,6 @@ public: uint64_t size() const; Domain & setConfig(const DomainConfig & cfg); private: - void commitIfFull(const vespalib::MonitorGuard & guard); - - std::unique_ptr<CommitChunk> grabCurrentChunk(const vespalib::MonitorGuard & guard); - void commitChunk(std::unique_ptr<CommitChunk> chunk, const vespalib::MonitorGuard & chunkOrderGuard); - void doCommit(std::unique_ptr<CommitChunk> chunk); SerialNum begin(const vespalib::LockGuard & guard) const; SerialNum end(const vespalib::LockGuard & guard) const; size_t byteSize(const vespalib::LockGuard & guard) const; diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp index b7e02894e6b..8855183226d 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp @@ -252,7 +252,7 @@ DomainPart::buildPacketMapping(bool allowTruncate) DomainPart::DomainPart(const string & name, const string & baseDir, SerialNum s, Encoding encoding, uint8_t compressionLevel, const FileHeaderContext &fileHeaderContext, bool allowTruncate) - : _encoding(encoding), + : _encoding(encoding.getCrc(), Encoding::Compression::none), //TODO We do not yet support compression _compressionLevel(compressionLevel), _lock(), _fileLock(), @@ -396,19 +396,16 @@ DomainPart::commit(SerialNum firstSerial, const Packet &packet) if (_range.from() == 0) { _range.from(firstSerial); } - IChunk::UP chunk = IChunk::create(_encoding, _compressionLevel); for (size_t i(0); h.size() > 0; i++) { //LOG(spam, //"Pos(%d) Len(%d), Lim(%d), Remaining(%d)", //h.getPos(), h.getLength(), h.getLimit(), h.getRemaining()); + IChunk::UP chunk = IChunk::create(_encoding, _compressionLevel); Packet::Entry entry; entry.deserialize(h); if (_range.to() < entry.serial()) { chunk->add(entry); - if (_encoding.getCompression() == Encoding::Compression::none) { - write(*_transLog, *chunk); - chunk = IChunk::create(_encoding, _compressionLevel); - } + write(*_transLog, *chunk); _sz++; _range.to(entry.serial()); } else { @@ -416,9 +413,6 @@ DomainPart::commit(SerialNum firstSerial, const Packet &packet) entry.serial(), _range.to())); } } - if ( ! chunk->getEntries().empty()) { - write(*_transLog, *chunk); - } bool merged(false); LockGuard guard(_lock); diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp index 0c0c9186e12..7be3dd708a5 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp @@ -572,11 +572,8 @@ TransLogServer::domainCommit(FRT_RPCRequest *req) Packet packet(params[1]._data._buf, params[1]._data._len); try { vespalib::Gate gate; - { - // Need to scope in order to drain out all the callbacks. - domain->append(packet, make_shared<GateCallback>(gate)); - auto keep = domain->startCommit(make_shared<IgnoreCallback>()); - } + domain->append(packet, make_shared<GateCallback>(gate)); + auto keep = domain->startCommit(make_shared<IgnoreCallback>()); gate.await(); ret.AddInt32(0); ret.AddString("ok"); |