diff options
Diffstat (limited to 'searchcore')
4 files changed, 50 insertions, 6 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.cpp b/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.cpp index 9db1fee58e1..1f979d1566c 100644 --- a/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.cpp +++ b/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.cpp @@ -19,7 +19,7 @@ LidReuseDelayerConfig::LidReuseDelayerConfig() LidReuseDelayerConfig::LidReuseDelayerConfig(vespalib::duration visibilityDelay, bool hasIndexedOrAttributeFields_in) : _visibilityDelay(visibilityDelay), - _allowEarlyAck(visibilityDelay > vespalib::duration::zero()), + _allowEarlyAck(visibilityDelay > 1ms), _hasIndexedOrAttributeFields(hasIndexedOrAttributeFields_in) { } diff --git a/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.h b/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.h index 388a1101dcf..5ed0ad7492c 100644 --- a/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.h +++ b/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.h @@ -135,7 +135,7 @@ public: } vespalib::duration getVisibilityDelay() const { return _visibilityDelay; } bool hasVisibilityDelay() const { return _visibilityDelay > vespalib::duration::zero(); } - bool allowEarlyAck() const { return hasVisibilityDelay(); } + bool allowEarlyAck() const { return _visibilityDelay > 1ms; } const DocumentDBLidSpaceCompactionConfig &getLidSpaceCompactionConfig() const { return _lidSpaceCompaction; } diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp index 282ec3c9ace..75414052ef5 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp @@ -93,18 +93,18 @@ TlsMgrWriter::sync(SerialNum syncTo) { for (int retryCount = 0; retryCount < 10; ++retryCount) { SerialNum syncedTo(0); - LOG(spam, "Trying tls sync(%" PRIu64 ")", syncTo); + LOG(debug, "Trying tls sync(%" PRIu64 ")", syncTo); bool res = _tls_mgr.getSession()->sync(syncTo, syncedTo); if (!res) { - LOG(spam, "Tls sync failed, retrying"); + LOG(debug, "Tls sync failed, retrying"); sleep(1); continue; } if (syncedTo >= syncTo) { - LOG(spam, "Tls sync complete, reached %" PRIu64", returning", syncedTo); + LOG(debug, "Tls sync complete, reached %" PRIu64", returning", syncedTo); return syncedTo; } - LOG(spam, "Tls sync incomplete, reached %" PRIu64 ", retrying", syncedTo); + LOG(debug, "Tls sync incomplete, reached %" PRIu64 ", retrying", syncedTo); } throw IllegalStateException(make_string("Failed to sync TLS to token %" PRIu64 ".", syncTo)); } @@ -402,6 +402,9 @@ FeedHandler::FeedHandler(IThreadingService &writeService, _tlsReplayProgress(), _serialNum(0), _prunedSerialNum(0), + _numOperationsPendingCommit(0), + _numOperationsCompleted(0), + _numCommitsCompleted(0), _delayedPrune(false), _feedLock(), _feedState(make_shared<InitState>(getDocTypeName())), @@ -495,11 +498,46 @@ FeedHandler::getTransactionLogReplayDone() const { } void +FeedHandler::onCommitDone(size_t numPendingAtStart) { + assert(numPendingAtStart <= _numOperationsPendingCommit); + _numOperationsPendingCommit -= numPendingAtStart; + _numOperationsCompleted += numPendingAtStart; + _numCommitsCompleted++; + if (_numOperationsPendingCommit > 0) { + enqueCommitTask(); + } + LOG(spam, "%zu: onCommitDone(%zu) total=%zu left=%zu", + _numCommitsCompleted, numPendingAtStart, _numOperationsCompleted, _numOperationsPendingCommit); +} + +void FeedHandler::enqueCommitTask() { + _writeService.master().execute(makeLambdaTask([this]() { initiateCommit(); })); +} + +void +FeedHandler::initiateCommit() { + auto onCommitDoneContext = std::make_shared<OnCommitDone>( + _writeService.master(), + makeLambdaTask([this, numPendingAtStart=_numOperationsPendingCommit]() { + onCommitDone(numPendingAtStart); + })); + auto commitResult = _tlsWriter->startCommit(onCommitDoneContext); + if (_activeFeedView) { + using KeepAlivePair = KeepAlive<std::pair<CommitResult, DoneCallback>>; + auto pair = std::make_pair(std::move(commitResult), std::move(onCommitDoneContext)); + _activeFeedView->forceCommit(_serialNum, std::make_shared<KeepAlivePair>(std::move(pair))); + } +} + +void FeedHandler::appendOperation(const FeedOperation &op, TlsWriter::DoneCallback onDone) { if (!op.getSerialNum()) { const_cast<FeedOperation &>(op).setSerialNum(incSerialNum()); } _tlsWriter->appendOperation(op, std::move(onDone)); + if (++_numOperationsPendingCommit == 1) { + enqueCommitTask(); + } } FeedHandler::CommitResult diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h index 4807c596130..c295b26a759 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h @@ -76,6 +76,9 @@ private: // the serial num of the last message in the transaction log SerialNum _serialNum; SerialNum _prunedSerialNum; + size_t _numOperationsPendingCommit; + size_t _numOperationsCompleted; + size_t _numCommitsCompleted; bool _delayedPrune; mutable std::shared_mutex _feedLock; FeedStateSP _feedState; @@ -125,6 +128,9 @@ private: FeedStateSP getFeedState() const; void changeFeedState(FeedStateSP newState); void doChangeFeedState(FeedStateSP newState); + void onCommitDone(size_t numPendingAtStart); + void initiateCommit(); + void enqueCommitTask(); public: FeedHandler(const FeedHandler &) = delete; FeedHandler & operator = (const FeedHandler &) = delete; |