aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-09-25 23:30:56 +0200
committerGitHub <noreply@github.com>2020-09-25 23:30:56 +0200
commitee30152811a3f3131a951ace109e2ddde78482ef (patch)
treee6911627a70d7dd36b40b513432c9591a8f4bc3f
parent9c809992ed833d7a22e6e4d2cf87022387fe395d (diff)
parentb2659214ba1810ade0cb490c87835f1025240b33 (diff)
Merge pull request #14542 from vespa-engine/revert-14539-revert-14515-revert-14514-revert-14495-balder/group-operations-to-tls-and-commit-in-batches
Revert "Revert "Revert "Revert "Balder/group operations to tls and commit in batches""""
-rw-r--r--searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.cpp1
-rw-r--r--searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.h4
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.h1
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp32
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedhandler.h5
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h4
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h6
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp26
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h2
-rw-r--r--searchlib/src/vespa/searchlib/config/translogserver.def2
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domain.cpp66
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domain.h5
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp12
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp7
18 files changed, 149 insertions, 32 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.cpp b/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.cpp
index b04bac5ef26..1f979d1566c 100644
--- a/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.cpp
+++ b/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.cpp
@@ -7,6 +7,7 @@ namespace proton::documentmetastore {
LidReuseDelayerConfig::LidReuseDelayerConfig(const DocumentDBConfig & configSnapshot)
: _visibilityDelay(configSnapshot.getMaintenanceConfigSP()->getVisibilityDelay()),
+ _allowEarlyAck(configSnapshot.getMaintenanceConfigSP()->allowEarlyAck()),
_hasIndexedOrAttributeFields(configSnapshot.getSchemaSP()->getNumIndexFields() > 0 ||
configSnapshot.getSchemaSP()->getNumAttributeFields() > 0)
{
@@ -18,6 +19,7 @@ LidReuseDelayerConfig::LidReuseDelayerConfig()
LidReuseDelayerConfig::LidReuseDelayerConfig(vespalib::duration visibilityDelay, bool hasIndexedOrAttributeFields_in)
: _visibilityDelay(visibilityDelay),
+ _allowEarlyAck(visibilityDelay > 1ms),
_hasIndexedOrAttributeFields(hasIndexedOrAttributeFields_in)
{
}
diff --git a/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.h b/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.h
index 82dab433a22..c81a2ff399f 100644
--- a/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.h
+++ b/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.h
@@ -15,6 +15,7 @@ class LidReuseDelayerConfig
{
private:
vespalib::duration _visibilityDelay;
+ bool _allowEarlyAck;
bool _hasIndexedOrAttributeFields;
public:
LidReuseDelayerConfig();
@@ -22,6 +23,7 @@ public:
explicit LidReuseDelayerConfig(const DocumentDBConfig &configSnapshot);
vespalib::duration visibilityDelay() const { return _visibilityDelay; }
bool hasIndexedOrAttributeFields() const { return _hasIndexedOrAttributeFields; }
+ bool allowEarlyAck() const { return _allowEarlyAck; }
};
}
diff --git a/searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.cpp b/searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.cpp
index 03dfd83a132..ed2202c830b 100644
--- a/searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.cpp
+++ b/searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.cpp
@@ -16,6 +16,7 @@ LidReuseDelayer::LidReuseDelayer(IThreadingService &writeService, IStore &docume
: _writeService(writeService),
_documentMetaStore(documentMetaStore),
_immediateCommit(config.visibilityDelay() == vespalib::duration::zero()),
+ _allowEarlyAck(config.allowEarlyAck()),
_config(config),
_pendingLids()
{
diff --git a/searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.h b/searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.h
index 5f1de878b4a..ba407ab57f8 100644
--- a/searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.h
+++ b/searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.h
@@ -27,6 +27,7 @@ class LidReuseDelayer
searchcorespi::index::IThreadingService &_writeService;
IStore &_documentMetaStore;
const bool _immediateCommit;
+ const bool _allowEarlyAck;
LidReuseDelayerConfig _config;
std::vector<uint32_t> _pendingLids; // lids waiting for commit
@@ -38,7 +39,8 @@ public:
bool delayReuse(const std::vector<uint32_t> &lids);
std::vector<uint32_t> getReuseLids();
- bool getImmediateCommit() const { return _immediateCommit; }
+ bool needImmediateCommit() const { return _immediateCommit; }
+ bool allowEarlyAck() const { return _allowEarlyAck; }
const LidReuseDelayerConfig & getConfig() const { return _config; }
};
diff --git a/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.h b/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.h
index 4c0485baec6..5ed0ad7492c 100644
--- a/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.h
+++ b/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.h
@@ -135,6 +135,7 @@ public:
}
vespalib::duration getVisibilityDelay() const { return _visibilityDelay; }
bool hasVisibilityDelay() const { return _visibilityDelay > vespalib::duration::zero(); }
+ bool allowEarlyAck() const { return _visibilityDelay > 1ms; }
const DocumentDBLidSpaceCompactionConfig &getLidSpaceCompactionConfig() const {
return _lidSpaceCompaction;
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
index 37dfddf0c2c..209a35ce4a2 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
@@ -402,6 +402,8 @@ FeedHandler::FeedHandler(IThreadingService &writeService,
_tlsReplayProgress(),
_serialNum(0),
_prunedSerialNum(0),
+ _numPendingCommit(0),
+ _numCommitsCompleted(0),
_delayedPrune(false),
_feedLock(),
_feedState(make_shared<InitState>(getDocTypeName())),
@@ -495,11 +497,39 @@ FeedHandler::getTransactionLogReplayDone() const {
}
void
+FeedHandler::onCommitDone(uint64_t numPendingAtStart) {
+ assert(numPendingAtStart <= _numPendingCommit);
+ _numPendingCommit -= numPendingAtStart;
+ if (_numPendingCommit > 0) {
+ enqueCommitTask();
+ }
+}
+
+void FeedHandler::enqueCommitTask() {
+ _writeService.master().execute(makeLambdaTask([this]() { initiateCommit(); }));
+}
+
+void
+FeedHandler::initiateCommit() {
+ auto commitResult = _tlsWriter->startCommit(std::make_shared<OnCommitDone>(
+ _writeService.master(),
+ makeLambdaTask([this, numPendingAtStart=_numPendingCommit]() {
+ onCommitDone(numPendingAtStart);
+ })));
+ if (_activeFeedView) {
+ _activeFeedView->forceCommit(_serialNum, std::make_shared<KeepAlive<CommitResult>>(std::move(commitResult)));
+ }
+}
+
+void
FeedHandler::appendOperation(const FeedOperation &op, TlsWriter::DoneCallback onDone) {
if (!op.getSerialNum()) {
const_cast<FeedOperation &>(op).setSerialNum(incSerialNum());
}
_tlsWriter->appendOperation(op, std::move(onDone));
+ if (++_numPendingCommit == 1) {
+ enqueCommitTask();
+ }
}
FeedHandler::CommitResult
@@ -510,7 +540,7 @@ FeedHandler::startCommit(DoneCallback onDone) {
void
FeedHandler::storeOperationSync(const FeedOperation &op) {
vespalib::Gate gate;
- appendOperation(op, make_shared<search::GateCallback>(gate));
+ appendAndCommitOperation(op, make_shared<search::GateCallback>(gate));
gate.await();
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
index 4807c596130..97629bfc018 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
+++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
@@ -76,6 +76,8 @@ private:
// the serial num of the last message in the transaction log
SerialNum _serialNum;
SerialNum _prunedSerialNum;
+ size_t _numPendingCommit;
+ size_t _numCommitsCompleted;
bool _delayedPrune;
mutable std::shared_mutex _feedLock;
FeedStateSP _feedState;
@@ -125,6 +127,9 @@ private:
FeedStateSP getFeedState() const;
void changeFeedState(FeedStateSP newState);
void doChangeFeedState(FeedStateSP newState);
+ void onCommitDone(uint64_t numPendingAtStart);
+ void initiateCommit();
+ void enqueCommitTask();
public:
FeedHandler(const FeedHandler &) = delete;
FeedHandler & operator = (const FeedHandler &) = delete;
diff --git a/searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h b/searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h
index b276779c2ee..c3b76a9db75 100644
--- a/searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h
+++ b/searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h
@@ -22,6 +22,10 @@ struct IOperationStorer
*/
virtual void appendOperation(const FeedOperation &op, DoneCallback onDone) = 0;
[[nodiscard]] virtual CommitResult startCommit(DoneCallback onDone) = 0;
+ void appendAndCommitOperation(const FeedOperation &op, DoneCallback onDone) {
+ appendOperation(op, onDone);
+ (void) startCommit(std::move(onDone));
+ }
};
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp
index d423e095ad9..468850b4409 100644
--- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp
@@ -82,7 +82,7 @@ LidSpaceCompactionJob::compactLidSpace(const LidUsageStats &stats)
uint32_t wantedLidLimit = stats.getHighestUsedLid() + 1;
CompactLidSpaceOperation op(_handler.getSubDbId(), wantedLidLimit);
vespalib::Gate gate;
- _opStorer.appendOperation(op, std::make_shared<search::GateCallback>(gate));
+ _opStorer.appendAndCommitOperation(op, std::make_shared<search::GateCallback>(gate));
gate.await();
_handler.handleCompactLidSpace(op);
EventLogger::lidSpaceCompactionComplete(_handler.getName(), wantedLidLimit);
diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h
index 37497eaa998..35549f21471 100644
--- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h
+++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h
@@ -63,13 +63,13 @@ public:
~LidSpaceCompactionJob();
// Implements IDiskMemUsageListener
- virtual void notifyDiskMemUsage(DiskMemUsageState state) override;
+ void notifyDiskMemUsage(DiskMemUsageState state) override;
// Implements IClusterStateChangedNofifier
- virtual void notifyClusterStateChanged(const IBucketStateCalculator::SP &newCalc) override;
+ void notifyClusterStateChanged(const IBucketStateCalculator::SP &newCalc) override;
// Implements IMaintenanceJob
- virtual bool run() override;
+ bool run() override;
};
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp
index d94bb2e3d03..d4b542a0af8 100644
--- a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp
@@ -105,7 +105,7 @@ MaintenanceJobsInjector::injectJobs(MaintenanceController &controller,
AttributeUsageFilter &attributeUsageFilter) {
controller.registerJobInMasterThread(std::make_unique<HeartBeatJob>(hbHandler, config.getHeartBeatConfig()));
controller.registerJobInDefaultPool(std::make_unique<PruneSessionCacheJob>(scPruner, config.getSessionCachePruneInterval()));
- if (config.hasVisibilityDelay()) {
+ if (config.hasVisibilityDelay() && config.allowEarlyAck()) {
controller.registerJobInMasterThread(std::make_unique<DocumentDBCommitJob>(commit, config.getVisibilityDelay()));
}
const MaintenanceDocumentSubDB &mRemSubDB(controller.getRemSubDB());
diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp
index 217a3bb24d3..9927a2d2ce0 100644
--- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp
@@ -210,11 +210,11 @@ moveMetaData(documentmetastore::IStore &meta_store, const DocumentId & doc_id, c
}
std::unique_ptr<PendingLidTrackerBase>
-createUncommitedLidTracker(bool needImmediateCommit) {
- if (needImmediateCommit) {
- return std::make_unique<PendingLidTracker>();
- } else {
+createUncommitedLidTracker(bool allowEarlyAck) {
+ if (allowEarlyAck) {
return std::make_unique<TwoPhasePendingLidTracker>();
+ } else {
+ return std::make_unique<PendingLidTracker>();
}
}
@@ -229,7 +229,7 @@ StoreOnlyFeedView::StoreOnlyFeedView(const Context &ctx, const PersistentParams
_docType(nullptr),
_lidReuseDelayer(ctx._writeService, _documentMetaStoreContext->get(), ctx._lidReuseDelayerConfig),
_pendingLidsForDocStore(),
- _pendingLidsForCommit(createUncommitedLidTracker(_lidReuseDelayer.getImmediateCommit())),
+ _pendingLidsForCommit(createUncommitedLidTracker(_lidReuseDelayer.allowEarlyAck())),
_schema(ctx._schema),
_writeService(ctx._writeService),
_params(params),
@@ -275,7 +275,7 @@ StoreOnlyFeedView::internalForceCommit(SerialNum serialNum, OnForceCommitDoneTyp
void
StoreOnlyFeedView::considerEarlyAck(FeedToken & token)
{
- if ( ! needCommit() && token) {
+ if ( _lidReuseDelayer.allowEarlyAck() && token) {
token.reset();
}
}
@@ -327,7 +327,7 @@ StoreOnlyFeedView::internalPut(FeedToken token, const PutOperation &putOp)
bool docAlreadyExists = putOp.getValidPrevDbdId(_params._subDbId);
if (putOp.getValidDbdId(_params._subDbId)) {
- bool immediateCommit = needCommit();
+ bool immediateCommit = needImmediateCommit();
const document::GlobalId &gid = docId.getGlobalId();
std::shared_ptr<PutDoneContext> onWriteDone =
createPutDoneContext(std::move(token), std::move(uncommitted),
@@ -345,8 +345,8 @@ StoreOnlyFeedView::internalPut(FeedToken token, const PutOperation &putOp)
}
bool
-StoreOnlyFeedView::needCommit() const {
- return _lidReuseDelayer.getImmediateCommit();
+StoreOnlyFeedView::needImmediateCommit() const {
+ return _lidReuseDelayer.needImmediateCommit();
}
void
@@ -483,7 +483,7 @@ StoreOnlyFeedView::internalUpdate(FeedToken token, const UpdateOperation &updOp)
auto uncommitted = _pendingLidsForCommit->produce(updOp.getLid());
considerEarlyAck(token);
- bool immediateCommit = needCommit();
+ bool immediateCommit = needImmediateCommit();
auto onWriteDone = createUpdateDoneContext(std::move(token), std::move(uncommitted), updOp.getUpdate());
UpdateScope updateScope(*_schema, upd);
updateAttributes(serialNum, lid, upd, immediateCommit, onWriteDone, updateScope);
@@ -657,7 +657,7 @@ StoreOnlyFeedView::internalRemove(FeedToken token, IPendingLidTracker::Token unc
std::move(pendingNotifyRemoveDone), (explicitReuseLid ? lid : 0u),
std::move(moveDoneCtx));
removeSummary(serialNum, lid, onWriteDone);
- bool immediateCommit = needCommit();
+ bool immediateCommit = needImmediateCommit();
removeAttributes(serialNum, lid, immediateCommit, onWriteDone);
removeIndexedFields(serialNum, lid, immediateCommit, onWriteDone);
}
@@ -770,7 +770,7 @@ StoreOnlyFeedView::handleDeleteBucket(const DeleteBucketOperation &delOp)
void
StoreOnlyFeedView::internalDeleteBucket(const DeleteBucketOperation &delOp)
{
- bool immediateCommit = needCommit();
+ bool immediateCommit = needImmediateCommit();
size_t rm_count = removeDocuments(delOp, true, immediateCommit);
LOG(debug, "internalDeleteBucket(): docType(%s), bucket(%s), lidsToRemove(%zu)",
_params._docTypeName.toString().c_str(), delOp.getBucketId().toString().c_str(), rm_count);
@@ -809,7 +809,7 @@ StoreOnlyFeedView::handleMove(const MoveOperation &moveOp, IDestructorCallback::
PendingNotifyRemoveDone pendingNotifyRemoveDone = adjustMetaStore(moveOp, docId.getGlobalId(), docId);
bool docAlreadyExists = moveOp.getValidPrevDbdId(_params._subDbId);
if (moveOp.getValidDbdId(_params._subDbId)) {
- bool immediateCommit = needCommit();
+ bool immediateCommit = needImmediateCommit();
const document::GlobalId &gid = docId.getGlobalId();
std::shared_ptr<PutDoneContext> onWriteDone =
createPutDoneContext(FeedToken(), _pendingLidsForCommit->produce(moveOp.getLid()),
diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h
index 7d91ea86a22..4569e01f9fd 100644
--- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h
+++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h
@@ -161,7 +161,7 @@ private:
void putSummary(SerialNum serialNum, Lid lid, DocumentSP doc, OnOperationDoneType onDone);
void removeSummary(SerialNum serialNum, Lid lid, OnWriteDoneType onDone);
void heartBeatSummary(SerialNum serialNum);
- bool needCommit() const;
+ bool needImmediateCommit() const;
bool useDocumentStore(SerialNum replaySerialNum) const {
diff --git a/searchlib/src/vespa/searchlib/config/translogserver.def b/searchlib/src/vespa/searchlib/config/translogserver.def
index 540895b2404..defce8c3421 100644
--- a/searchlib/src/vespa/searchlib/config/translogserver.def
+++ b/searchlib/src/vespa/searchlib/config/translogserver.def
@@ -15,7 +15,7 @@ basedir string default="tmp" restart
## Use fsync after each commit.
## If not the below interval is used.
-usefsync bool default=false restart
+usefsync bool default=false
##Number of threads available for visiting/subscription.
maxthreads int default=4 restart
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
index 9e0f1a8a1aa..bd7feec0598 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
@@ -113,7 +113,12 @@ Domain::addPart(SerialNum partId, bool isLastPart) {
}
}
-Domain::~Domain() { }
+Domain::~Domain() {
+ MonitorGuard guard(_currentChunkMonitor);
+ guard.broadcast();
+ commitChunk(grabCurrentChunk(guard), guard);
+ _singleCommitter->shutdown().sync();
+}
DomainInfo
Domain::getDomainInfo() const
@@ -318,22 +323,73 @@ Domain::optionallyRotateFile(SerialNum serialNum) {
return dp;
}
+void
+Domain::append(const Packet & packet, Writer::DoneCallback onDone) {
+ vespalib::MonitorGuard guard(_currentChunkMonitor);
+ if (_lastSerial >= packet.range().from()) {
+ throw runtime_error(fmt("Incoming serial number(%" PRIu64 ") must be bigger than the last one (%" PRIu64 ").",
+ packet.range().from(), _lastSerial));
+ } else {
+ _lastSerial = packet.range().to();
+ }
+ _currentChunk->add(packet, std::move(onDone));
+ commitIfFull(guard);
+}
+
Domain::CommitResult
Domain::startCommit(DoneCallback onDone) {
- (void) onDone;
+ vespalib::MonitorGuard guard(_currentChunkMonitor);
+ if ( !_currentChunk->empty() ) {
+ auto completed = grabCurrentChunk(guard);
+ completed->setCommitDoneCallback(std::move(onDone));
+ CommitResult result(completed->createCommitResult());
+ commitChunk(std::move(completed), guard);
+ return result;
+ }
return CommitResult();
}
void
-Domain::append(const Packet & packet, Writer::DoneCallback onDone)
-{
- (void) onDone;
+Domain::commitIfFull(const vespalib::MonitorGuard &guard) {
+ if (_currentChunk->sizeBytes() > _config.getChunkSizeLimit()) {
+ auto completed = std::move(_currentChunk);
+ _currentChunk = std::make_unique<CommitChunk>(_config.getChunkSizeLimit(), completed->stealCallbacks());
+ commitChunk(std::move(completed), guard);
+ }
+}
+
+std::unique_ptr<CommitChunk>
+Domain::grabCurrentChunk(const vespalib::MonitorGuard & guard) {
+ assert(guard.monitors(_currentChunkMonitor));
+ auto chunk = std::move(_currentChunk);
+ _currentChunk = createCommitChunk(_config);
+ return chunk;
+}
+
+void
+Domain::commitChunk(std::unique_ptr<CommitChunk> chunk, const vespalib::MonitorGuard & chunkOrderGuard) {
+ assert(chunkOrderGuard.monitors(_currentChunkMonitor));
+ _singleCommitter->execute( makeLambdaTask([this, chunk = std::move(chunk)]() mutable {
+ doCommit(std::move(chunk));
+ }));
+}
+
+void
+Domain::doCommit(std::unique_ptr<CommitChunk> chunk) {
+ const Packet & packet = chunk->getPacket();
+ if (packet.empty()) return;
+
vespalib::nbostream_longlivedbuf is(packet.getHandle().data(), packet.getHandle().size());
Packet::Entry entry;
entry.deserialize(is);
DomainPart::SP dp = optionallyRotateFile(entry.serial());
dp->commit(entry.serial(), packet);
+ if (_config.getFSyncOnCommit()) {
+ dp->sync();
+ }
cleanSessions();
+ LOG(debug, "Releasing %zu acks and %zu entries and %zu bytes.",
+ chunk->getNumCallBacks(), chunk->getPacket().size(), chunk->sizeBytes());
}
bool
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h
index 7e77e6ef0ef..041ec27cf23 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domain.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h
@@ -56,6 +56,11 @@ public:
uint64_t size() const;
Domain & setConfig(const DomainConfig & cfg);
private:
+ void commitIfFull(const vespalib::MonitorGuard & guard);
+
+ std::unique_ptr<CommitChunk> grabCurrentChunk(const vespalib::MonitorGuard & guard);
+ void commitChunk(std::unique_ptr<CommitChunk> chunk, const vespalib::MonitorGuard & chunkOrderGuard);
+ void doCommit(std::unique_ptr<CommitChunk> chunk);
SerialNum begin(const vespalib::LockGuard & guard) const;
SerialNum end(const vespalib::LockGuard & guard) const;
size_t byteSize(const vespalib::LockGuard & guard) const;
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp
index 8855183226d..b7e02894e6b 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp
@@ -252,7 +252,7 @@ DomainPart::buildPacketMapping(bool allowTruncate)
DomainPart::DomainPart(const string & name, const string & baseDir, SerialNum s, Encoding encoding,
uint8_t compressionLevel, const FileHeaderContext &fileHeaderContext, bool allowTruncate)
- : _encoding(encoding.getCrc(), Encoding::Compression::none), //TODO We do not yet support compression
+ : _encoding(encoding),
_compressionLevel(compressionLevel),
_lock(),
_fileLock(),
@@ -396,16 +396,19 @@ DomainPart::commit(SerialNum firstSerial, const Packet &packet)
if (_range.from() == 0) {
_range.from(firstSerial);
}
+ IChunk::UP chunk = IChunk::create(_encoding, _compressionLevel);
for (size_t i(0); h.size() > 0; i++) {
//LOG(spam,
//"Pos(%d) Len(%d), Lim(%d), Remaining(%d)",
//h.getPos(), h.getLength(), h.getLimit(), h.getRemaining());
- IChunk::UP chunk = IChunk::create(_encoding, _compressionLevel);
Packet::Entry entry;
entry.deserialize(h);
if (_range.to() < entry.serial()) {
chunk->add(entry);
- write(*_transLog, *chunk);
+ if (_encoding.getCompression() == Encoding::Compression::none) {
+ write(*_transLog, *chunk);
+ chunk = IChunk::create(_encoding, _compressionLevel);
+ }
_sz++;
_range.to(entry.serial());
} else {
@@ -413,6 +416,9 @@ DomainPart::commit(SerialNum firstSerial, const Packet &packet)
entry.serial(), _range.to()));
}
}
+ if ( ! chunk->getEntries().empty()) {
+ write(*_transLog, *chunk);
+ }
bool merged(false);
LockGuard guard(_lock);
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
index 7be3dd708a5..0c0c9186e12 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
@@ -572,8 +572,11 @@ TransLogServer::domainCommit(FRT_RPCRequest *req)
Packet packet(params[1]._data._buf, params[1]._data._len);
try {
vespalib::Gate gate;
- domain->append(packet, make_shared<GateCallback>(gate));
- auto keep = domain->startCommit(make_shared<IgnoreCallback>());
+ {
+ // Need to scope in order to drain out all the callbacks.
+ domain->append(packet, make_shared<GateCallback>(gate));
+ auto keep = domain->startCommit(make_shared<IgnoreCallback>());
+ }
gate.await();
ret.AddInt32(0);
ret.AddString("ok");