From 48e6d31dbe64356cfb31e8dea7f36330fa479341 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Fri, 10 Dec 2021 12:18:17 +0000 Subject: Let accounting handle multiple commits in flight. --- .../vespa/searchcore/proton/server/feedhandler.cpp | 36 +++++++++++++--------- .../vespa/searchcore/proton/server/feedhandler.h | 6 +++- 2 files changed, 27 insertions(+), 15 deletions(-) (limited to 'searchcore') diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp index 0ac91870eab..209fe39b877 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp @@ -416,10 +416,12 @@ FeedHandler::FeedHandler(IThreadingService &writeService, _tlsReplayProgress(), _serialNum(0), _prunedSerialNum(0), - _replay_end_serial_num(0u), - _prepare_serial_num(0u), - _numOperationsPendingCommit(0), + _replay_end_serial_num(0), + _prepare_serial_num(0), + _numOperationsStarted(0), _numOperationsCompleted(0), + _numOperationsStartedAtLastCommitStart(0), + _numCommitsStarted(0), _numCommitsCompleted(0), _delayedPrune(false), _feedLock(), @@ -519,34 +521,39 @@ FeedHandler::getTransactionLogReplayDone() const { } void -FeedHandler::onCommitDone(size_t numPendingAtStart, vespalib::steady_time start_time) { - assert(numPendingAtStart <= _numOperationsPendingCommit); - _numOperationsPendingCommit -= numPendingAtStart; - _numOperationsCompleted += numPendingAtStart; +FeedHandler::onCommitDone(size_t numOperationsInCommit, vespalib::steady_time start_time) { + assert(_numCommitsStarted > _numCommitsCompleted); + assert(_numOperationsStarted >= _numOperationsCompleted + numOperationsInCommit); + _numOperationsCompleted += numOperationsInCommit; _numCommitsCompleted++; - if (_numOperationsPendingCommit > 0) { + if ((numOperationsInFlight() > 0) && numCommitsInFlight() == 0) { enqueCommitTask(); } LOG(spam, "%zu: onCommitDone(%zu) total=%zu left=%zu", - _numCommitsCompleted, numPendingAtStart, _numOperationsCompleted, _numOperationsPendingCommit); + _numCommitsCompleted, numOperationsInCommit, _numOperationsCompleted, numOperationsInFlight()); vespalib::steady_time now = vespalib::steady_clock::now(); auto latency = vespalib::to_s(now - start_time); std::lock_guard guard(_stats_lock); - _stats.add_commit(numPendingAtStart, latency); + _stats.add_commit(numOperationsInCommit, latency); } void FeedHandler::enqueCommitTask() { - _writeService.master().execute(makeLambdaTask([this, start_time(vespalib::steady_clock::now())]() { initiateCommit(start_time); })); + _writeService.master().execute(makeLambdaTask([this, start_time(vespalib::steady_clock::now())]() { + initiateCommit(start_time); + })); } void FeedHandler::initiateCommit(vespalib::steady_time start_time) { + size_t numOperationsSinceLastInitiatedCommit = _numOperationsStarted - _numOperationsStartedAtLastCommitStart; auto onCommitDoneContext = std::make_shared( _writeService.master(), - makeLambdaTask([this, numPendingAtStart=_numOperationsPendingCommit, start_time]() { - onCommitDone(numPendingAtStart, start_time); + makeLambdaTask([this, numOperationsSinceLastInitiatedCommit, start_time]() { + onCommitDone(numOperationsSinceLastInitiatedCommit, start_time); })); auto commitResult = _tlsWriter->startCommit(onCommitDoneContext); + _numCommitsStarted++; + _numOperationsStartedAtLastCommitStart = _numOperationsStarted; if (_activeFeedView) { using KeepAlivePair = vespalib::KeepAlive>; auto pair = std::make_pair(std::move(commitResult), std::move(onCommitDoneContext)); @@ -560,7 +567,8 @@ FeedHandler::appendOperation(const FeedOperation &op, TlsWriter::DoneCallback on const_cast(op).setSerialNum(inc_serial_num()); } _tlsWriter->appendOperation(op, std::move(onDone)); - if (++_numOperationsPendingCommit == 1) { + ++_numOperationsStarted; + if (numOperationsInFlight() == 1) { enqueCommitTask(); } } diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h index 39d1f0f47fb..7b319779620 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h @@ -83,8 +83,10 @@ private: // the serial num of the last feed operation in the transaction log at startup before replay SerialNum _replay_end_serial_num; uint64_t _prepare_serial_num; - size_t _numOperationsPendingCommit; + size_t _numOperationsStarted; size_t _numOperationsCompleted; + size_t _numOperationsStartedAtLastCommitStart; + size_t _numCommitsStarted; size_t _numCommitsCompleted; bool _delayedPrune; mutable std::shared_mutex _feedLock; @@ -140,6 +142,8 @@ private: void onCommitDone(size_t numPendingAtStart, vespalib::steady_time start_time); void initiateCommit(vespalib::steady_time start_time); void enqueCommitTask(); + size_t numOperationsInFlight() const { return _numOperationsStarted - _numOperationsCompleted; } + size_t numCommitsInFlight() const { return _numCommitsStarted - _numCommitsCompleted; } public: FeedHandler(const FeedHandler &) = delete; FeedHandler & operator = (const FeedHandler &) = delete; -- cgit v1.2.3